Skip to content

Commit

Permalink
Airflow on Kubernetes minus Foreachs.
Browse files Browse the repository at this point in the history
- Support for all metaflow construct without foreach and sensors

Squashed commit of the following:

commit ef8b1e3
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jul 29 01:06:26 2022 +0000

    Removed sernsors and banned foreach's

commit 8d517c4
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jul 29 00:59:01 2022 +0000

    commiting k8s related file from master.

commit a7e1ecd
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jul 29 00:54:45 2022 +0000

    Uncommented code for foreach support with k8s

    KubernetesPodOperator version 4.2.0 renamed `resources` to
    `container_resources`
    - Check : (apache/airflow#24673) /
    - (apache/airflow@45f4290)

    This was done because `KubernetesPodOperator` didn't play nice with dynamic task mapping and they had to deprecate the `resources` argument. Hence the below codepath checks for the version of `KubernetesPodOperator`
    and then sets the argument. If the version < 4.2.0 then we set the argument as `resources`.
    If it is > 4.2.0 then we set the argument as `container_resources`
    The `resources` argument of KuberentesPodOperator is going to be deprecated soon in the future.
    So we will only use it for `KuberentesPodOperator` version < 4.2.0
    The `resources` argument will also not work for foreach's.

commit 2719f5d
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jul 18 18:31:58 2022 +0000

    nit fixes :
    - fixing comments.
    - refactor some variable/function names.

commit 2079293
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jul 18 18:14:53 2022 +0000

    change `token` to `production_token`

commit 14aad5f
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jul 18 18:11:56 2022 +0000

    Refactored import Airflow Sensors.

commit b1472d5
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jul 18 18:08:41 2022 +0000

    new comment on `startup_timeout_seconds` env var.

commit 6d81b75
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jul 18 18:06:09 2022 +0000

    Removing traces of `@airflow_schedule_interval`

commit 0673db7
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Jul 14 12:43:08 2022 -0700

    Foreach polish (#62)

    * Removing unused imports
    * Added validation logic for airflow version numbers with foreaches
    * Removed `airflow_schedule_interval` decorator.

    * Added production/deployment token related changes
    - Uses s3 as a backend to store the production token
    - Token used for avoiding nameclashes
    - token stored via `FlowDatastore`

    * Graph type validation for airflow foreachs
    - Airflow foreachs only support single node fanout.
    - validation invalidates graphs with nested foreachs

    * Added configuration about startup_timeout.

    * Added final todo on `resources` argument of k8sOp
    - added a commented code block
    - it needs to be uncommented when airflow releasese the patch for the op
    - Code seems feature complete keeping aside airflow patch

commit 4b2dd12
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Jul 7 19:33:07 2022 +0000

    Removed retries from user-defaults.

commit 0e87a97
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Jul 6 16:29:33 2022 +0000

    updated pod startup time

commit fce2bd2
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Jun 29 18:44:11 2022 +0000

    Adding default 1 retry for any airflow worker.

commit 5ef6bbc
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 27 01:22:42 2022 +0000

    Airflow Foreach Integration
    - Simple one node foreach-join support as gaurenteed by airflow
    - Fixed env variable setting issue
    - introduced MetaflowKuberentesOperator
    - Created a new operator to allow smootness in plumbing xcom values
    - Some todos

commit d319fa9
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 24 21:12:09 2022 +0000

    simplifying run-id macro.

commit 0ffc813
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 24 11:51:42 2022 -0700

    Refactored parameter macro settings. (#60)

commit a3a4950
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 24 02:05:57 2022 +0000

    added comment on need for `start_date`

commit a3147be
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Tue Jun 21 06:03:56 2022 +0000

    Refactored an `id_creator` method.

commit 04d7f20
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Tue Jun 21 05:52:05 2022 +0000

    refactor :
    -`RUN_ID_LEN` to `RUN_HASH_ID_LEN`
    - `TASK_ID_LEN` to `TASK_ID_HASH_LEN`

commit cde4605
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Tue Jun 21 05:48:55 2022 +0000

    refactored an error string

commit 1145818
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 20 22:42:36 2022 -0700

    addressing  savins comments. (#59)

    - Added many adhoc changes based for some comments.
    - Integrated secrets and `KUBERNETES_SECRETS`
    - cleaned up parameter setting
    - cleaned up setting of scheduling interval
    - renamed `AIRFLOW_TASK_ID_TEMPLATE_VALUE` to `AIRFLOW_TASK_ID`
    - renamed `AirflowSensorDecorator.compile` to `AirflowSensorDecorator.validate`
    - Checking if dagfile and flow file are same.
    - fixing variable names.
    - checking out `kubernetes_decorator.py` from master (6441ed5)
    - bug fixing secret setting in airflow.
    - simplified parameter type parsing logic
    - refactoring airflow argument parsing code.

commit 83b20a7
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 13 14:02:57 2022 -0700

    Addressing Final comments.  (#57)

    - Added dag-run timeout.
    - airflow related scheduling checks in decorator.
    - Auto naming sensors if no name is provided
    - Annotations to k8s operators
    - fix: argument serialization for `DAG` arguments (method names refactored like `to_dict` became `serialize`)
    - annotation bug fix
    - setting`workflow-timeout` for only scheduled dags

commit 4931f9c
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 6 04:50:49 2022 +0000

    k8s bug fix

commit 200ae8e
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 6 04:39:50 2022 +0000

    removed un-used function

commit 70e285e
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 6 04:38:37 2022 +0000

    Removed unused `sanitize_label` function

commit 84fc622
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 6 04:37:34 2022 +0000

    GPU support added + container naming same as argo

commit c92280d
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 6 04:25:17 2022 +0000

    Refactored sensors to different files + bug fix
    - bug caused due `util.compress_list`.
    - The function doesn't play nice with strings with variety of characters.
    - Ensured that exceptions are handled appropriately.
    - Made new file for each sensor under `airflow.sensors` module.

commit b72a1dc
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Jun 4 01:41:49 2022 +0000

    ran black.

commit 558c82f
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 18:32:48 2022 -0700

    Moving information from airflow_utils to compiler (#56)

    - commenting todos to organize unfinished changes.
    - some environment variables set via`V1EnvVar`
        - `client.V1ObjectFieldSelector` mapped env vars were not working in json form
        - Moving k8s operator import into its own function.
        - env vars moved.

commit 9bb5f63
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 18:06:03 2022 +0000

    added mising Run-id prefixes to variables.
    - merged `hash` and `dash_connect` filters.

commit 37b5e6a
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 18:00:22 2022 +0000

    nit fix : variable name change.

commit 660756f
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 17:58:34 2022 +0000

    nit fixes to dag.py's templating variables.

commit 1202f5b
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 17:56:53 2022 +0000

    Fixed defaults passing
    - Addressed comments for airflow.py

commit b9387dd
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 17:52:24 2022 +0000

    Following Changes:
    - Refactors setting scheduling interval
    - refactor dag file creating function
    - refactored is_active to is_paused_upon_creation
    - removed catchup

commit 054e3f3
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 17:33:25 2022 +0000

    Multiple Changes based on comments:
    1. refactored `create_k8s_args` into _to_job
    2. Addressed comments for snake casing
    3. refactored `attrs` for simplicity.
    4. refactored `metaflow_parameters` to `parameters`.
    5. Refactored setting of `input_paths`

commit d481b2f
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 16:42:24 2022 +0000

    Removed Sensor metadata extraction.

commit d8e6ec0
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Jun 3 16:30:34 2022 +0000

    porting savin's comments
    - next changes : addressing comments.

commit 3f2353a
Merge: d370ffb c1ff469
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Jul 28 23:52:16 2022 +0000

    Merge branch 'master' into airflow

commit d370ffb
Merge: a82f144 e4eb751
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Jul 14 19:38:48 2022 +0000

    Merge branch 'master' into airflow

commit a82f144
Merge: bdb1f0d 6f097e3
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Jul 13 00:35:49 2022 +0000

    Merge branch 'master' into airflow

commit bdb1f0d
Merge: 8511215 f9a4968
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Jun 29 18:44:51 2022 +0000

    Merge branch 'master' into airflow

commit 8511215
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Tue Jun 21 02:53:11 2022 +0000

    Bug fix from master merge.

commit 90c06f1
Merge: 0fb73af 6441ed5
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Jun 20 21:20:20 2022 +0000

    Merge branch 'master' into airflow

commit 0fb73af
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Jun 4 00:53:10 2022 +0000

    squashing bugs after changes from master.

commit 09c6ba7
Merge: 7bdf662 ffff49b
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Jun 4 00:20:38 2022 +0000

    Merge branch 'master' into af-mmr

commit 7bdf662
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon May 16 17:42:38 2022 -0700

    Airflow sensor api (#3)

    * Fixed run-id setting
    - Change gaurentees that multiple dags triggered at same moment have unique run-id

    * added allow multiple in `Decorator` class

    * Airflow sensor integration.
     >> support added for :
    - ExternalTaskSensor
    - S3KeySensor
    - SqlSensor
    >> sensors allow multiple decorators
    >> sensors accept those arguments which are supported by airflow

    * Added `@airflow_schedule_interval` decorator
    * Fixing bug run-id related in env variable setting.

commit 2604a29
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 21 18:26:59 2022 +0000

    Addressed comments.

commit 584e88b
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:33:55 2022 +0000

    fixed printing bug

commit 169ac15
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:30:59 2022 +0000

    Option help bug fix.

commit 6f8489b
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:25:54 2022 +0000

    variable renamemetaflow_specific_args

commit 0c779ab
Merge: d299b13 5a61508
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:23:10 2022 +0000

    Merge branch 'airflow-tests' into airflow

commit 5a61508
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:22:54 2022 +0000

    Removing un-used code / resolved-todos.

commit d030830
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:06:03 2022 +0000

    ran black,

commit 2d1fc06
Merge: f2cb319 7921d13
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:04:19 2022 +0000

    Merge branch 'master' into airflow-tests

commit d299b13
Merge: f2cb319 7921d13
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 03:02:37 2022 +0000

    Merge branch 'master' into airflow

commit f2cb319
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 02:54:03 2022 +0000

    reverting change.

commit 05b9db9
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Apr 20 02:47:41 2022 +0000

    3 changes:
    - Removing s3 dep
    - remove uesless import
    - added `deployed_on` in dag file template

commit c6afba9
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Apr 15 22:50:52 2022 +0000

    Fixed passing secrets with kubernetes.

commit c3ce7e9
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Apr 15 22:04:22 2022 +0000

    Refactored code .
    - removed compute/k8s.py
    - Moved k8s code to airflow_compiler.py
    - ran isort to airflow_compiler.py

commit d1c343d
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Apr 15 18:02:25 2022 +0000

    Added validations about:
    - un-supported decorators
    - foreach
    Changed where validations are done to not save the package.

commit 7b19f8e
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Apr 15 03:34:26 2022 +0000

    Fixing mf log related bug
    - No double logging on metaflow.

commit 4d1f6bf
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 03:10:51 2022 +0000

    Removed usless code WRT project decorator.

commit 5ad9a39
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 03:03:19 2022 +0000

    Remove readme.

commit 60cb6a7
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 03:02:38 2022 +0000

    Made file path required arguement.

commit 9f0dc1b
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 03:01:07 2022 +0000

    changed `--is-active`->`--is-paused-upon-creation`
    - dags are active by default.

commit 5b98f93
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 02:55:46 2022 +0000

    shortened length of run-id and task-id hashes.

commit e53426e
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 02:41:32 2022 +0000

    Removing un-used args.

commit 72cbbfc
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 02:39:59 2022 +0000

    Moved exceptions to airflow compiler

commit b2970dd
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 14 02:33:02 2022 +0000

    Changes based on PR comments:
    - removed airflow xcom push file , moved to decorator code
    - removed prefix configuration
    - nit fixes.

commit 9e622ba
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Apr 11 20:39:00 2022 +0000

    Removing un-used code paths + code cleanup

commit 7425f62
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Apr 11 19:45:04 2022 +0000

    Fixing bug fix in schedule.

commit eb775cb
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Apr 10 02:52:59 2022 +0000

    Bug fixes WRT Kubernetes secrets + k8s deployments.
    - Fixing some error messages.
    - Added some comments.

commit 04c92b9
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Apr 10 01:20:53 2022 +0000

    Added secrets support.

commit 4a0a85d
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Apr 10 00:11:46 2022 +0000

    Bug fix.

commit af91099
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Apr 10 00:03:34 2022 +0000

    bug fix.

commit c17f04a
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Apr 9 23:55:41 2022 +0000

    Bug fix in active defaults.

commit 0d37236
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Apr 9 23:54:02 2022 +0000

    @project, @schedule, default active dag support.
    - Added a flag to allow setting dag as active on creation
    - Airflow compatible schedule interval
    - Project name fixes.

commit 5c97b15
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 7 21:15:18 2022 +0000

    Max workers and worker pool support.

commit 9c973f2
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Apr 7 19:34:33 2022 +0000

    Adding exceptions for missing features.

commit 2a946e2
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Mon Mar 28 19:34:11 2022 +0000

    2 changes :
    - removed hacky line
    - added support to directly throw dags in s3.

commit e0772ec
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Mar 23 22:38:20 2022 +0000

    fixing bugs in service account setting

commit 874b94a
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Mar 20 23:49:15 2022 +0000

    Added support for Branching with Airflow
    - remove `next` function in `AirflowTask`
    - `AirflowTask`s has no knowledge of next tasks.
    - removed todos + added some todos
    - Graph construction on airflow side using graph_structure datastructure.
    - graph_structure comes from`FlowGraph.output_steps()[1]`

commit 8e9f649
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Mar 20 02:33:04 2022 +0000

    Added hacky line

commit fd5db04
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Mar 20 02:06:38 2022 +0000

    Removed hacky line.

commit 5b23eb7
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Mar 20 01:44:57 2022 +0000

    Added support for Parameters.
    - Supporting int, str, bool, float, JSONType

commit c9378e9
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sun Mar 20 00:14:10 2022 +0000

    Removed todos + added some validation logic.

commit 7250a44
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Mar 19 23:45:15 2022 +0000

    Fixing logs related change from master.

commit d125978
Merge: 8cdac53 7e210a2
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Mar 19 23:42:48 2022 +0000

    Merge branch 'master' into aft-mm

commit 8cdac53
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Mar 19 23:36:47 2022 +0000

    making changes sync with master

commit 5a93d9f
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Mar 19 23:29:47 2022 +0000

    Fixed bug when using catch + retry

commit 62bc8df
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Mar 19 22:58:37 2022 +0000

    Changed retry setting.

commit 563a200
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Mar 19 22:42:57 2022 +0000

    Fixed setting `task_id` :
    - switch task-id from airflow job is to hash to "runid/stepname"
    - refactor xcom setting variables
    - added comments

commit e2a1e50
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Mar 19 17:51:59 2022 +0000

    setting retry logic.

commit a697b56
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Mar 17 01:02:11 2022 +0000

    Nit fix.

commit 68f13be
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Mar 16 20:46:19 2022 +0000

    Added @schedule support + readme

commit 57bdde5
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Tue Mar 15 19:47:06 2022 +0000

    Fixed setting run-id / task-id to labels in k8s
    - Fixed setting run-id has from cli macro
    - added hashing macro to ensure that jinja template set the correct run-id to k8s labels
    -

commit 3d6c319
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Tue Mar 15 05:39:04 2022 +0000

    Got linear workflows working on airflow.
    - Still not feature complete as lots of args are still unfilled / lots of unknows
    - minor tweek in eks to ensure airflow is k8s compatible.
    - passing state around via xcom-push
    - HACK : AWS keys are passed in a shady way. : Reverse this soon.

commit db074b8
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Fri Mar 11 12:34:33 2022 -0800

    Tweeks

commit a9f0468
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Tue Mar 1 17:14:47 2022 -0800

    some changes based on savin's comments.
    - Added changes to task datastore for different reason : (todo) Decouple these
    - Added comments to SFN for reference.
    - Airflow DAG is no longer dependent on metaflow

commit f32d089
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Wed Feb 23 00:54:17 2022 -0800

    First version of dynamic dag compiler.
    - Not completely finished code
    - Creates generic .py file a JSON that is parsed to create Airflow DAG.
    - Currently only boiler plate to make a linear dag but doesn't execute anything.
    -  Unfinished code.

commit d2def66
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Sat Feb 19 14:01:47 2022 -0800

    more tweeks.

commit b176311
Author: Valay Dave <valaygaurang@gmail.com>
Date:   Thu Feb 17 09:04:29 2022 -0800

    commit 0
    - unfinished code.
  • Loading branch information
valayDave committed Jul 29, 2022
1 parent ca29bec commit 52d8524
Show file tree
Hide file tree
Showing 11 changed files with 1,836 additions and 12 deletions.
41 changes: 30 additions & 11 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class Decorator(object):

name = "NONAME"
defaults = {}
# `allow_multiple` allows setting many decorators of the same type to a step/flow.
allow_multiple = False

def __init__(self, attributes=None, statically_defined=False):
self.attributes = self.defaults.copy()
Expand Down Expand Up @@ -226,9 +228,6 @@ class MyDecorator(StepDecorator):
pass them around with every lifecycle call.
"""

# `allow_multiple` allows setting many decorators of the same type to a step.
allow_multiple = False

def step_init(
self, flow, graph, step_name, decorators, environment, flow_datastore, logger
):
Expand Down Expand Up @@ -374,12 +373,17 @@ def _base_flow_decorator(decofunc, *args, **kwargs):
if isinstance(cls, type) and issubclass(cls, FlowSpec):
# flow decorators add attributes in the class dictionary,
# _flow_decorators.
if decofunc.name in cls._flow_decorators:
if decofunc.name in cls._flow_decorators and not decofunc.allow_multiple:
raise DuplicateFlowDecoratorException(decofunc.name)
else:
cls._flow_decorators[decofunc.name] = decofunc(
attributes=kwargs, statically_defined=True
)
deco_instance = decofunc(attributes=kwargs, statically_defined=True)
if decofunc.allow_multiple:
if decofunc.name not in cls._flow_decorators:
cls._flow_decorators[decofunc.name] = [deco_instance]
else:
cls._flow_decorators[decofunc.name].append(deco_instance)
else:
cls._flow_decorators[decofunc.name] = deco_instance
else:
raise BadFlowDecoratorException(decofunc.name)
return cls
Expand Down Expand Up @@ -469,11 +473,26 @@ def _attach_decorators_to_step(step, decospecs):
def _init_flow_decorators(
flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options
):
# Certain decorators can be specified multiple times and exist as lists in the _flow_decorators dictionary
for deco in flow._flow_decorators.values():
opts = {option: deco_options[option] for option in deco.options}
deco.flow_init(
flow, graph, environment, flow_datastore, metadata, logger, echo, opts
)
if type(deco) == list:
for rd in deco:
opts = {option: deco_options[option] for option in rd.options}
rd.flow_init(
flow,
graph,
environment,
flow_datastore,
metadata,
logger,
echo,
opts,
)
else:
opts = {option: deco_options[option] for option in deco.options}
deco.flow_init(
flow, graph, environment, flow_datastore, metadata, logger, echo, opts
)


def _init_step_decorators(flow, graph, environment, flow_datastore, logger):
Expand Down
9 changes: 9 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ def from_conf(name, default=None):
)
#

##
# Airflow Configuration
##
# This configuration sets `startup_timeout_seconds` in airflow's KubernetesPodOperator.
AIRFLOW_KUBERNETES_STARTUP_TIMEOUT = from_conf(
"METAFLOW_AIRFLOW_KUBERNETES_STARTUP_TIMEOUT_SECONDS", 60 * 60
)


###
# Conda configuration
###
Expand Down
11 changes: 10 additions & 1 deletion metaflow/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def get_plugin_cli():
from .aws.batch import batch_cli
from .kubernetes import kubernetes_cli
from .aws.step_functions import step_functions_cli
from .airflow import airflow_cli
from .argo import argo_workflows_cli
from .cards import card_cli
from . import tag_cli
Expand All @@ -98,6 +99,7 @@ def get_plugin_cli():
card_cli.cli,
kubernetes_cli.cli,
step_functions_cli.cli,
airflow_cli.cli,
argo_workflows_cli.cli,
tag_cli.cli,
]
Expand All @@ -121,6 +123,7 @@ def get_plugin_cli():
from .conda.conda_step_decorator import CondaStepDecorator
from .cards.card_decorator import CardDecorator
from .frameworks.pytorch import PytorchParallelDecorator
from .airflow.airflow_decorator import AirflowInternalDecorator


STEP_DECORATORS = [
Expand All @@ -137,6 +140,7 @@ def get_plugin_cli():
ParallelDecorator,
PytorchParallelDecorator,
InternalTestUnboundedForeachDecorator,
AirflowInternalDecorator,
ArgoWorkflowsInternalDecorator,
]
_merge_lists(STEP_DECORATORS, _ext_plugins["STEP_DECORATORS"], "name")
Expand All @@ -161,7 +165,12 @@ def get_plugin_cli():
from .aws.step_functions.schedule_decorator import ScheduleDecorator
from .project_decorator import ProjectDecorator

FLOW_DECORATORS = [CondaFlowDecorator, ScheduleDecorator, ProjectDecorator]

FLOW_DECORATORS = [
CondaFlowDecorator,
ScheduleDecorator,
ProjectDecorator,
]
_merge_lists(FLOW_DECORATORS, _ext_plugins["FLOW_DECORATORS"], "name")

# Cards
Expand Down
Empty file.
Loading

0 comments on commit 52d8524

Please sign in to comment.