Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditional branch documentation + example usage #71

Closed
russellbrooks opened this issue Dec 19, 2019 · 14 comments
Closed

Conditional branch documentation + example usage #71

russellbrooks opened this issue Dec 19, 2019 · 14 comments

Comments

@russellbrooks
Copy link
Contributor

russellbrooks commented Dec 19, 2019

From the flowspec docstring:

- Conditional branch:
    self.next(self.if_true, self.if_false, condition='boolean_variable')
  In this situation, both `if_true` and `if_false` are methods in the current class
  decorated with the `@step` decorator and `boolean_variable` is a variable name
  in the current class that evaluates to True or False. The `if_true` step will be
  executed if thecondition variable evaluates to True and the `if_false` step will
  be executed otherwise

It'd be great to have this mentioned on Metaflow's website with an example.

On a related note, this capability makes it easier to introduce cycles into the DAG and while the documentation mentions:

Metaflow infers a directed (typically acyclic) graph based on the transitions between step functions.

The acyclicity seems to be checked by the linter, however that piece of validation is not disabled using the --no-pylint flag. It's alluded to the possibility of a graph with cycles, but it doesn't seem possible to do for now.

Example:

from metaflow import FlowSpec, step


class TestFlowConditional(FlowSpec):
    """
    A toy flow to mimic a hyperparameter tuning strategy.
    
    The flow performs the following steps:
    1) Load data.
    2) Generate hyperparameter candidates.
    3) Fan-out training over hyperparameter candidates to evaluate using foreach.
    4) Join results.
    5) Conditionally stop at max iterations or keep evaluating.
    """

    @step
    def start(self):
        # self.data = ...
        self._iteration = 0
        self._max_iteration = 15
        self._num_candidates = 3
        self.results = []
        self.next(self.generate_candidates)

    @step
    def generate_candidates(self):
        candidates = []
        for _ in range(self._num_candidates):
            candidate = {
                "hyperparameters": { 
                    # ... 
                },
            }
            candidates.append(candidate)
        self.candidates = candidates
        self._iteration += len(candidates)
        self.next(self.train, foreach='candidates')

    @step
    def train(self):
        hyperparams = self.input['hyperparameters']
        
        # ...
        
        self.next(self.join)

    @step
    def join(self, inputs):
        """
        Combine results for hyperparameter candidates.
        """
        # ...
        self.next(self.should_stop)

    @step
    def should_stop(self):
        """
        Conditional branch to end when max iterations is reached, otherwise evaluate more candidates.
        """
        self._done = self._iteration < self._max_iteration
        self.next(self.end, self.generate_candidates, condition='_done')

    @step
    def end(self):
        print("Finished.")

if __name__ == '__main__':
    TestFlowConditional()
python TestMetaflowConditional.py --no-pylint run

Results in:

Metaflow 2.0.1 executing TestFlowConditional for user:russell
Validating your flow...
    Validity checker found an issue on line 62:
    There is a loop in your flow: generate_candidates->train->join->should_stop->generate_candidates. Break the loop by fixing self.next() transitions.

Keep up the great work and I've been enjoying Metaflow so far!

@savingoyal
Copy link
Collaborator

@russellbrooks Please see this link. We don't formally support conditional branches yet. I will remove the code docstring in a PR soon.

@savingoyal
Copy link
Collaborator

#72 addresses this concern.

@thesillystudent
Copy link

@savingoyal can this be opened again ? I have raised a PR for it - #76 but I would need some help in writing the tests and any other thing I might have missed.

@kgrvamsi
Copy link

kgrvamsi commented Mar 6, 2021

@savingoyal just want to check on when can we expect the conditional functionality. I have a use case where i want to check if a array len is zero then end the task and if it is greater than zero then proceed with the following and i tried the following it didn't worked but i think the right answer would be metaflow supporting conditional flows

from metaflow import FlowSpec, step, Flow

class SomeFlow(FlowSpec):
    """[summary]

    Args:
        FlowSpec ([type]): [description]
    """

    @step
    def start(self):
          self.array = []
          if len(self.array) == 0:
                self.next(self.end)
          else:
                self.next(self.some_value)

     @step
      def some_value(self):
            print(self.array)
             self.next(self.end)

    @step
    def end(self):
          print("End task")
     

@savingoyal
Copy link
Collaborator

@kgrvamsi Yes, you would have to essentially do something like -

class SomeFlow(FlowSpec):
    @step
    def start(self):
          self.array = []
          self.next(self.some_value)

     @step
      def some_value(self):
          if len(self.array) == 0:
                # do something
          else:
              print(self.array)
          self.next(self.end)

    @step
    def end(self):
          print("End task")

@savingoyal
Copy link
Collaborator

@thesillystudent We would have to make sure that we are supporting resume functionality as well as AWS Step-Functions with conditionals.

@sgaseretto
Copy link

Are there any plans on adding conditional branching in the near future? I think this is a key feature to implement workflows like simple automatic retraining (or continuous training), where you compare if a new model performs better than an old one, and if so, the new model replaces the old one

@mikejmills
Copy link

Bumping this thread.

Any sense when we might see this feature?

@tuulos
Copy link
Collaborator

tuulos commented Apr 16, 2021

@mikejmills curious, what's your use case? I listed some commonly used workarounds here https://gitter.im/metaflow_org/community?at=6079df40a2ac0d38e7be28fc

@mikejmills
Copy link

mikejmills commented Apr 19, 2021 via email

@ckrapu
Copy link

ckrapu commented Aug 23, 2022

These workarounds do pretty well but I've come across a use case that isn't covered yet. I have 10+ steps which should be executed in parallel with conditional execution (i.e. 2 or 3 of them may need to be skipped). Some of these steps include their own foreach branching downstream before joining results downstream.

I tried manipulating the args to self.next with something like this:

@step
def troublesome_step(self):
  possible_next_steps = [self.step1, self.step2, self.step3, ....]
  
  # Here, self.ignored_steps might be ['step1'] or ['step1', 'step10'] 
  actual_steps = [step for step in possible_next_steps if step.__name__ not in self.ignored_steps]
  
  # Filter out steps that are not needed
  self.next(*actual_steps)

Unfortunately, the graph parser isn't happy with the list unpacking and throws the error Step <my_step> specifies an invalid self.next() transition. Make sure the self.next() expression matches with one of the supported transition types..

Is there a trick to programmatically filter out undesired steps using something like this?

@savingoyal
Copy link
Collaborator

@ckrapu Would the foreach construct be a better choice here?

@ckrapu
Copy link

ckrapu commented Aug 23, 2022

@ckrapu Would the foreach construct be a better choice here?

I think it could work, but the iterable I'd prefer to use in the foreach would be a list of step functions as opposed to a list of str or a sequence of DataFrame as is commonly done. Otherwise, I suppose I could write a small wrapper that goes into the correct step based on the string its passed.

One high level question that I have is about the graph parsing (in particular the bit that occurs here); AFAICT the arguments supplied to self.next have to be explicitly laid out and we can't just supply an arbitrary sequence or iterable there. Does that seem right?

@bbrandt
Copy link

bbrandt commented Feb 21, 2023

The gitter.im link does not seem to take me to the message, but I searched and tracked down the message I think it is pointing to. Copying it here for reference:

conditional branches are not supported today - a key reason is that workflows should be executable by other scheduler like Step Functions or Argo, which might not support conditionals and/or evaluation of arbitrary conditional expressions written in Python. This far, it hasn't been a major issue since there are some decent workarounds:

  1. you can add a conditional at the top of your @step, like if x, which simply makes the step a no-op if the conditional is false. You pay the cost of starting a no-op task (some tens of seconds) but it might not be a huge issue.
  2. you can model your conditional as a foreach: create a list of things you want to evaluate based on the conditional, like [x for x in all_things if x]. Note that the list can't be an empty list, so you need to include at least one task that may be a no-op.
  3. if you have a larger subgraph under the conditional, you can model it is as a separate flow. You can export the flows to Step Functions and have the first flow trigger the second, if the conditional is true. (We need to document this triggering logic better but technically it is doable today) (Related: Expand @schedule to trigger based on external events, such as changes to AWS S3 bucket #468, Composable Flows and Steps #245, Co-ordinating Multiple Flows #280)
    maybe one of these approaches could work for you

Here is the new share link to the message:
https://matrix.to/#/!dxivTsLWyzHtfVZmST:gitter.im/$bMFy_bIzhxZSfBD7nzzgfRfclKP5vJhHWchfmWm6d28?via=gitter.im&via=matrix.org&via=tchncs.de

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants