Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding autocommit property for Redshift queries for Vacuums, etc. #2242

Merged
merged 5 commits into from
Sep 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions luigi/contrib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ class PostgresQuery(rdbms.Query):

Usage:
Subclass and override the required `host`, `database`, `user`, `password`, `table`, and `query` attributes.
Optionally one can override the `autocommit` attribute to put the connection for the query in autocommit mode.

Override the `run` method if your use case requires some action with the query result.

Expand All @@ -359,6 +360,7 @@ class PostgresQuery(rdbms.Query):

def run(self):
connection = self.output().connect()
connection.autocommit = self.autocommit
cursor = connection.cursor()
sql = self.query

Expand Down
8 changes: 8 additions & 0 deletions luigi/contrib/rdbms.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ class Query(luigi.task.MixinNaiveBulkComplete, luigi.Task):
* `table`,
* `query`

Optionally override:

* `autocommit`

Subclass and override the following methods:

* `output`
Expand Down Expand Up @@ -178,6 +182,10 @@ def table(self):
def query(self):
return None

@property
def autocommit(self):
return False

@abc.abstractmethod
def run(self):
raise NotImplementedError("This method must be overridden")
Expand Down
1 change: 1 addition & 0 deletions luigi/contrib/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ class RedshiftUnloadTask(postgres.PostgresQuery, _CredentialsMixin):

Usage:
Subclass and override the required `host`, `database`, `user`, `password`, `table`, and `query` attributes.
Optionally, override the `autocommit` atribute to run the query in autocommit mode - this is necessary to run VACUUM for example.
Override the `run` method if your use case requires some action with the query result.
Task instances require a dynamic `update_id`, e.g. via parameter(s), otherwise the query will only execute once
To customize the query signature as recorded in the database marker table, override the `update_id` property.
Expand Down
31 changes: 31 additions & 0 deletions test/contrib/redshift_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,34 @@ def test_redshift_unload_command(self, mock_redshift_target):
"credentials 'aws_access_key_id=AWS_ACCESS_KEY;aws_secret_access_key=AWS_SECRET_KEY' "
"DELIMITER ',' ADDQUOTES GZIP ALLOWOVERWRITE PARALLEL OFF;"
)


class DummyRedshiftAutocommitQuery(luigi.contrib.redshift.RedshiftQuery):
# Class attributes taken from `DummyPostgresImporter` in
# `../postgres_test.py`.
host = 'dummy_host'
database = 'dummy_database'
user = 'dummy_user'
password = 'dummy_password'
table = luigi.Parameter(default='dummy_table')
autocommit = True

def query(self):
return "SELECT 'a' as col_a, current_date as col_b"


class TestRedshiftAutocommitQuery(unittest.TestCase):
@mock.patch("luigi.contrib.redshift.RedshiftTarget")
def test_redshift_autocommit_query(self, mock_redshift_target):

task = DummyRedshiftAutocommitQuery()
task.run()

# The mocked connection cursor passed to
# RedshiftUnloadTask.
mock_connect = (mock_redshift_target.return_value
.connect
.return_value)

# Check the Unload query.
self.assertTrue(mock_connect.autocommit)