-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sql to gcs with exclude columns #23695
Changes from 7 commits
c2a5e72
783390c
d68afc0
003aa8e
17ed3c2
d58e2fd
b7cc954
6a063f6
cf46ea8
812e7e5
458d138
0c511b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -19,7 +19,7 @@ | |||||||||||
import abc | ||||||||||||
import json | ||||||||||||
from tempfile import NamedTemporaryFile | ||||||||||||
from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union | ||||||||||||
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union | ||||||||||||
|
||||||||||||
import pyarrow as pa | ||||||||||||
import pyarrow.parquet as pq | ||||||||||||
|
@@ -71,6 +71,7 @@ class BaseSQLToGCSOperator(BaseOperator): | |||||||||||
If set as a sequence, the identities from the list must grant | ||||||||||||
Service Account Token Creator IAM role to the directly preceding identity, with first | ||||||||||||
account from the list granting this role to the originating account (templated). | ||||||||||||
:param exclude_columns: list of columns to exclude from transmission | ||||||||||||
""" | ||||||||||||
|
||||||||||||
template_fields: Sequence[str] = ( | ||||||||||||
|
@@ -103,6 +104,7 @@ def __init__( | |||||||||||
gcp_conn_id: str = 'google_cloud_default', | ||||||||||||
delegate_to: Optional[str] = None, | ||||||||||||
impersonation_chain: Optional[Union[str, Sequence[str]]] = None, | ||||||||||||
exclude_columns: List[str] = [], | ||||||||||||
**kwargs, | ||||||||||||
) -> None: | ||||||||||||
super().__init__(**kwargs) | ||||||||||||
|
@@ -120,6 +122,7 @@ def __init__( | |||||||||||
self.gcp_conn_id = gcp_conn_id | ||||||||||||
self.delegate_to = delegate_to | ||||||||||||
self.impersonation_chain = impersonation_chain | ||||||||||||
self.exclude_columns = exclude_columns | ||||||||||||
|
||||||||||||
def execute(self, context: 'Context'): | ||||||||||||
self.log.info("Executing query") | ||||||||||||
|
@@ -165,7 +168,9 @@ def _write_local_data_files(self, cursor): | |||||||||||
names in GCS, and values are file handles to local files that | ||||||||||||
contain the data for the GCS objects. | ||||||||||||
""" | ||||||||||||
schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) | ||||||||||||
org_schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) | ||||||||||||
schema = [column for column in org_schema if column not in self.exclude_columns] | ||||||||||||
|
||||||||||||
Comment on lines
+174
to
+176
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider casting
Suggested change
WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If schema is changed to set type, the order of the columns is changed. however, it seems good to receive the input value as a set to prevent duplication of the input exclude column. delegate_to: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
exclude_columns=None,
**kwargs,
) -> None:
super().__init__(**kwargs)
if exclude_columns is None:
exclude_columns = set() using exclude_columns as a set to prevent duplication There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me, consider There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @turbaszek |
||||||||||||
col_type_dict = self._get_col_type_dict() | ||||||||||||
file_no = 0 | ||||||||||||
|
||||||||||||
|
@@ -314,7 +319,11 @@ def _write_local_schema_file(self, cursor): | |||||||||||
schema = self.schema | ||||||||||||
else: | ||||||||||||
self.log.info("Starts generating schema") | ||||||||||||
schema = [self.field_to_bigquery(field) for field in cursor.description] | ||||||||||||
schema = [ | ||||||||||||
self.field_to_bigquery(field) | ||||||||||||
for field in cursor.description | ||||||||||||
if field[0] not in self.exclude_columns | ||||||||||||
] | ||||||||||||
|
||||||||||||
if isinstance(schema, list): | ||||||||||||
schema = json.dumps(schema, sort_keys=True) | ||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use
[]
as default argument. UseNone
and in constructor doexclude_columns = exclude_columns or []
as we do in many places.https://florimond.dev/en/posts/2018/08/python-mutable-defaults-are-the-source-of-all-evil/