Skip to content

Commit

Permalink
Removed duplicated code on S3ToRedshiftOperator (#18671)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariotaddeucci authored Oct 8, 2021
1 parent b8efdac commit 22768ff
Showing 1 changed file with 0 additions and 24 deletions.
24 changes: 0 additions & 24 deletions airflow/providers/amazon/aws/transfers/s3_to_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,6 @@ def _build_copy_query(self, copy_destination: str, credentials_block: str, copy_
{copy_options};
"""

def _get_table_primary_key(self, postgres_hook):
sql = """
select kcu.column_name
from information_schema.table_constraints tco
join information_schema.key_column_usage kcu
on kcu.constraint_name = tco.constraint_name
and kcu.constraint_schema = tco.constraint_schema
and kcu.constraint_name = tco.constraint_name
where tco.constraint_type = 'PRIMARY KEY'
and kcu.table_schema = %s
and kcu.table_name = %s
"""

result = postgres_hook.get_records(sql, (self.schema, self.table))

if len(result) == 0:
raise AirflowException(
f"""
No primary key on {self.schema}.{self.table}.
Please provide keys on 'upsert_keys' parameter.
"""
)
return [row[0] for row in result]

def execute(self, context) -> None:
postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
conn = S3Hook.get_connection(conn_id=self.aws_conn_id)
Expand Down

0 comments on commit 22768ff

Please sign in to comment.