Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RDS operators/sensors #25952

Closed
2 tasks done
hankehly opened this issue Aug 25, 2022 · 12 comments · Fixed by #27076
Closed
2 tasks done

Add RDS operators/sensors #25952

hankehly opened this issue Aug 25, 2022 · 12 comments · Fixed by #27076
Assignees
Labels

Comments

@hankehly
Copy link
Contributor

hankehly commented Aug 25, 2022

Description

I think adding the following operators/sensors would benefit companies that need to start/stop RDS instances programmatically.

Name Description PR
RdsStartDbOperator Start an instance, and optionally wait for it enter "available" state #27076
RdsStopDbOperator Start an instance, and optionally wait for it to enter "stopped" state #27076
RdsDbSensor Wait for the requested status (eg. available, stopped) #26003

Is this something that would be accepted into the codebase?
Please let me know.

Use case/motivation

1. Saving money

RDS is expensive. To save money, a company keeps test/dev environment relational databases shutdown until it needs to use them. With Airflow, they can start a database instance before running a workload, then turn it off after the workload finishes (or errors).

2. Force RDS to stay shutdown

RDS automatically starts a database after 1 week of downtime. A company does not need this feature. They can create a DAG to continuously run the shutdown command on a list of databases instance ids stored in a Variable. The alternative is to create a shell script or login to the console and manually shutdown each database every week.

3. Making sure a database is running before scheduling workload

A company programmatically starts/stops its RDS instances. Before they run a workload, they want to make sure it's running. They can use a sensor to make sure a database is available before attempting to run any jobs that require access.

Also, during maintenance windows, RDS instances may be taken offline. Rather than tuning each DAG schedule to run outside of this window, a company can use a sensor to wait until the instance is available. (Yes, the availability check could also take place immediately before the maintenance window.)

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@hankehly hankehly added the kind:feature Feature Requests label Aug 25, 2022
@potiuk
Copy link
Member

potiuk commented Aug 25, 2022

Sure - go ahead - just make sure there are no similar operators there already,. And @o-nikolas @ferruzzi @vincbeck might provide their guidance too :)

@vincbeck
Copy link
Contributor

These operators do not exist and indeed, would be beneficial for the users. Feel free to add them in airflow/providers/amazon/aws/operators/rds.py. My only guidance would be to follow other RDS operators and to include documentation, unit tests and add them in the system test. I look forward to reviewing that

@o-nikolas
Copy link
Contributor

+1 from me as well!

@hankehly
Copy link
Contributor Author

Great, thanks for the feedback. I'll submit some PRs.

@kazanzhy
Copy link
Contributor

kazanzhy commented Sep 4, 2022

It will be great to keep consistency with these PRs:
#21231
#20907

I mean the db_type in RdsStartDbOperator, which can be either instance' or 'cluster'.
WDYT, @hankehly?

@hankehly
Copy link
Contributor Author

hankehly commented Sep 5, 2022

@kazanzhy Yes that sounds good to me. In that case, how about we rename the RdsInstanceSensor to RdsDbSensor.

kaxil pushed a commit that referenced this issue Sep 5, 2022
Related: #24099 #25952

### Summary

This PR sets the `template_fields` property on `RdsCreateDbInstanceOperator` and `RdsDeleteDbInstanceOperator` to allow users to programmatically change the database id, instance size, allocated storage, etc..

It also replaces use of `@task` decorated functions with their appropriate operator in system tests.


Co-authored-by: D. Ferruzzi <ferruzzi@amazon.com>
@hankehly
Copy link
Contributor Author

hankehly commented Sep 6, 2022

cc @dstandish @o-nikolas @ferruzzi @kazanzhy

I'd like someone's opinion please.

While editing at the operator/sensor code, I'm noticing a couple things:

  • some duplication in RdsBaseSensor / RdsBaseOperator around the _describe_item method
  • base classes are responsible for details of subclasses

The current implementation is a great start, but I think we could improve the design to reduce duplication. How about creating (private) reusable models of each RDS resource (ie db instance, cluster, snapshot, etc..) like in the following example?

# 1. Create a shared model class for RdsInstance
class RdsInstance:
    def __init__(self, hook, id):
        self.hook = hook
        self.id = id

    def check_status(self, target_statuses: List[str]) -> bool:
        instances = self.hook.conn.describe_db_instances(DBInstanceIdentifier=self.id)
        return instances[0]["DbInstanceStatus"] in target_statuses

    def start(self):
        pass # todo

    def stop(self):
        pass # todo


# 2. Create a shared model class for RdsCluster
class RdsCluster:
    def __init__(self, hook, id):
        self.hook = hook
        self.id = id

    def check_status(self, target_statuses: List[str]) -> bool:
        instances = self.hook.conn.describe_db_clusters(DBClusterIdentifier=self.id)
        return instances[0]["Status"] in target_statuses

    def start(self):
        pass # todo

    def stop(self):
        pass # todo


# 3. Use the models in sensor classes (reduce code duplication)
class RdsDbSensor(RdsBaseSensor):
    def _poke(self):
        return self._resource.check_status(self.target_statuses)

    @cached_property
    def _resource(self):
        if self.db_type == "instance":
            return RdbInstance(self.hook, self.db_identifier)
        return RdbCluster(self.hook, self.db_identifier)


# 4. Use the models in operator classes (reduce code duplication)
class RdsStartDbOperator(RdsBaseOperator):
    def _execute(self):
        self._resource.start()

    @cached_property
    def _resource(self):
        if self.db_type == "instance":
            return RdbInstance(self.hook, self.db_identifier)
        return RdbCluster(self.hook, self.db_identifier)

@vincbeck
Copy link
Contributor

vincbeck commented Sep 6, 2022

I am not strongly against this solution but I do not see any real value compare to what we what today. I understand the intent of de duplicating the if statement in _describe_item in both operator and sensor but on the other side you also add complexity by adding these new "resources". In my opinion, that would make things even more complicated to anyone who want to work on these operators/sensors.

That's only my humble opinion :)

@hankehly
Copy link
Contributor Author

hankehly commented Sep 7, 2022

Thanks @vincbeck for the feedback. I'll stick to the way it's implemented.

@o-nikolas
Copy link
Contributor

Hey @hankehly,

I see that the sensor PR was merged. Are you still working on the Start/Stop operators?

@hankehly
Copy link
Contributor Author

@o-nikolas Yes, I apologize for the delay. I have an implementation ready, just need to add tests. I'll have a PR open this weekend.

@o-nikolas
Copy link
Contributor

@o-nikolas Yes, I apologize for the delay. I have an implementation ready, just need to add tests. I'll have a PR open this weekend.

No worries at all! Thanks for sticking with it 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants