From 443db12a9acef0179e28621ee7c2f3c441fefa52 Mon Sep 17 00:00:00 2001 From: Mario Taddeucci Date: Mon, 1 Nov 2021 22:01:30 -0300 Subject: [PATCH 1/2] Fix S3 to redshift operator --- .../amazon/aws/transfers/s3_to_redshift.py | 25 ++++++++----------- .../aws/transfers/test_s3_to_redshift.py | 6 ++--- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index c0a1a7c647351..5f5eb38a0173f 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -149,12 +149,7 @@ def execute(self, context) -> None: copy_statement = self._build_copy_query(copy_destination, credentials_block, copy_options) if self.method == 'REPLACE': - sql = f""" - BEGIN; - DELETE FROM {destination}; - {copy_statement} - COMMIT - """ + sql = ["BEGIN;", f"DELETE FROM {destination};", copy_statement, "COMMIT"] elif self.method == 'UPSERT': keys = self.upsert_keys or redshift_hook.get_table_primary_key(self.table, self.schema) if not keys: @@ -162,14 +157,16 @@ def execute(self, context) -> None: f"No primary key on {self.schema}.{self.table}. Please provide keys on 'upsert_keys'" ) where_statement = ' AND '.join([f'{self.table}.{k} = {copy_destination}.{k}' for k in keys]) - sql = f""" - CREATE TABLE {copy_destination} (LIKE {destination}); - {copy_statement} - BEGIN; - DELETE FROM {destination} USING {copy_destination} WHERE {where_statement}; - INSERT INTO {destination} SELECT * FROM {copy_destination}; - COMMIT - """ + + sql = [ + f"CREATE TABLE {copy_destination} (LIKE {destination});", + copy_statement, + "BEGIN;", + f"DELETE FROM {destination} USING {copy_destination} WHERE {where_statement};", + f"INSERT INTO {destination} SELECT * FROM {copy_destination};", + "COMMIT", + ] + else: sql = copy_statement diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py index ff03165f7b7c0..0f4327312d9fc 100644 --- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py +++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py @@ -170,7 +170,7 @@ def test_deprecated_truncate(self, mock_run, mock_session, mock_connection, mock {copy_statement} COMMIT """ - assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction) + assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0].join("\n"), transaction) assert mock_run.call_count == 1 @@ -222,7 +222,7 @@ def test_replace(self, mock_run, mock_session, mock_connection, mock_hook): {copy_statement} COMMIT """ - assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction) + assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0].join("\n"), transaction) assert mock_run.call_count == 1 @@ -277,7 +277,7 @@ def test_upsert(self, mock_run, mock_session, mock_connection, mock_hook): INSERT INTO {schema}.{table} SELECT * FROM #{table}; COMMIT """ - assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0], transaction) + assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0].join("\n"), transaction) assert mock_run.call_count == 1 From 0cd5febb0fd3b86388e7ef577f6c5b57317b4be4 Mon Sep 17 00:00:00 2001 From: Mario Taddeucci Date: Mon, 1 Nov 2021 22:47:29 -0300 Subject: [PATCH 2/2] Update test_s3_to_redshift.py --- tests/providers/amazon/aws/transfers/test_s3_to_redshift.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py index 0f4327312d9fc..007ed87fd5327 100644 --- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py +++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py @@ -170,7 +170,7 @@ def test_deprecated_truncate(self, mock_run, mock_session, mock_connection, mock {copy_statement} COMMIT """ - assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0].join("\n"), transaction) + assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction) assert mock_run.call_count == 1 @@ -222,7 +222,7 @@ def test_replace(self, mock_run, mock_session, mock_connection, mock_hook): {copy_statement} COMMIT """ - assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0].join("\n"), transaction) + assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction) assert mock_run.call_count == 1 @@ -277,7 +277,7 @@ def test_upsert(self, mock_run, mock_session, mock_connection, mock_hook): INSERT INTO {schema}.{table} SELECT * FROM #{table}; COMMIT """ - assert_equal_ignore_multiple_spaces(self, mock_run.call_args[0][0].join("\n"), transaction) + assert_equal_ignore_multiple_spaces(self, "\n".join(mock_run.call_args[0][0]), transaction) assert mock_run.call_count == 1