Deferrable mode for EKS Create/Delete Operator#32355
Conversation
|
We just merged #31712 we should likely follow the way it is implemented. @Lee-W : I think we might want to add a pre-commit with some AST parsing likely - to add a check for providers that if they are using We have a few pre-commits that use AST to inspect parsed code so you could take it as an example. |
ferruzzi
left a comment
There was a problem hiding this comment.
One change required, and a few questions and nitpicks.
There was a problem hiding this comment.
nitpick
| def execute_failed(self, context, event=None): | |
| def execute_failed(self, context: Context, event: dict[str, Any] | None=None) -> None: |
There was a problem hiding this comment.
I've been using the function definition of the execute_* method from the official docs, which do it like this
There was a problem hiding this comment.
Got it. Not sure whether we should update the doc there 🤔 Let me send a PR to see how others thinkg
There was a problem hiding this comment.
I saw that your PR got merged. I added the change here, but I was having issues with mypy which didn't like the fact that event could be None:
airflow/providers/amazon/aws/operators/eks.py:766: error: Value of type
"Optional[Dict[str, Any]]" is not indexable [index]
if event["status"] == "success":
This can be solved by checking to make sure event is not None everytime:
if event and event["status"] == "success":
or remove None as an option and set the default to {}.
The last option is to simply ignore mypy, which is something I would rather avoid if possible.
I'm going with removing None as an option, but let me know if you think another option is more reasonable.
There was a problem hiding this comment.
Sorry, I didn't notice this reply, but I think we agree to use the solution on #32355 (comment)
c042ee6 to
a79c229
Compare
| subnets=cast(List[str], self.resources_vpc_config.get("subnetIds")), | ||
| ) | ||
|
|
||
| def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any] = {}) -> None: |
There was a problem hiding this comment.
I'm unsure whether we should use {} as the default value. I believe it's generally not a good idea to use mutable object as default
There was a problem hiding this comment.
hmm that's a good point. I guess I can set it to None and do a check for None before indexing it i.e.
if event and event["status"]
Its going to be tedious to do it everywhere, but I don't see a better option
There was a problem hiding this comment.
Yep, I think this is the best option we have as of now.
| ) | ||
| self.log.info(SUCCESS_MSG.format(compute=FARGATE_FULL_NAME)) | ||
|
|
||
| def execute_complete(self, context: Context, event: dict[str, Any] = {}) -> None: |
| subnets=cast(List[str], self.resources_vpc_config.get("subnetIds")), | ||
| ) | ||
|
|
||
| def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any] = {}) -> None: |
There was a problem hiding this comment.
Yep, I think this is the best option we have as of now.
There was a problem hiding this comment.
Sorry, I didn't notice this reply, but I think we agree to use the solution on #32355 (comment)
Lee-W
left a comment
There was a problem hiding this comment.
LGTM. Just left one minor nitpick.
This PR adds
deferrablemode to theEksCreateClusterOperatorandEksDeleteClusterOperator. The associated unit tests are also included.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.