Skip to content

Implement HITLBranchOperator #52349

@Lee-W

Description

@Lee-W

Body

Why

AIP-90

What

Implement HITLBranchOperator.execute_complete

How

Implementation

Now we have

class HITLBranchOperator(HITLOperator):

in the main branch, we can start developing the

def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
raise NotImplementedError

method.

We should retrieve the response from the user like

def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
ret = super().execute_complete(context=context, event=event)

and retrieve the chosen_options ret["chosen_options"]

For the class itself, it needs to inherit from BranchMixIn, and set the class variable inherits_from_skipmixin = True

We'll end up have something like the following

class HITLBranchOperator(HITLOperator, BranchMixIn):
    inherits_from_skipmixin = True

    def execute_complete(self, context: Context, event: dict[str, Any]) -> Any:
        ret = super().execute_complete(context=context, event=event)
        chosen_options = res["chosen_options"]
        return self.do_branch(context=context, branches_to_execute=chosen_options)

After it's implemented and tested, we'll need to add test cases here https://github.com/apache/airflow/blob/main/providers/standard/tests/unit/standard/operators/test_hitl.py

Example dag for all HITL operators will be handled by @Lee-W in another PR

End-to-end test

Please refer to #52868 (comment) and change the operator to HITLBranchOperator

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.

Metadata

Metadata

Assignees

Labels

kind:featureFeature Requestskind:metaHigh-level information important to the community

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions