Skip to content

Commit

Permalink
allow bq base cursor methods run_extract, run_copy, run_load to all
Browse files Browse the repository at this point in the history
take in source or destination table strings that include projects.

For backwards compatibility reasons, the project is not required.

This allows for decoupling of the execution of these methods from
projects that have the information they access.
  • Loading branch information
mtagle committed Mar 9, 2016
1 parent cbf139c commit ab73b70
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 55 deletions.
89 changes: 57 additions & 32 deletions airflow/contrib/hooks/bigquery_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def run_query(self, bql, destination_dataset_table = False, write_disposition =

return self.run_with_configuration(configuration)

def run_extract(self, source_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=',', print_header=True):
def run_extract(self, source_project_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=',', print_header=True):
"""
Executes a BigQuery extract command to copy data from BigQuery to
Google Cloud Storage. See here:
Expand All @@ -206,8 +206,8 @@ def run_extract(self, source_dataset_table, destination_cloud_storage_uris, comp
For more details about these parameters.
:param source_dataset_table: The dotted <dataset>.<table> BigQuery table to use as the source data.
:type source_dataset_table: string
:param source_project_dataset_table: The dotted <dataset>.<table> BigQuery table to use as the source data.
:type source_project_dataset_table: string
:param destination_cloud_storage_uris: The destination Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). Follows
convention defined here:
Expand All @@ -222,14 +222,11 @@ def run_extract(self, source_dataset_table, destination_cloud_storage_uris, comp
:param print_header: Whether to print a header for a CSV file extract.
:type print_header: boolean
"""
assert '.' in source_dataset_table, \
'Expected source_dataset_table in the format of <dataset>.<table>. Got: {}'.format(source_dataset_table)

source_dataset, source_table = source_dataset_table.split('.', 1)
source_project, source_dataset, source_table = self._split_project_dataset_table_input('source_project_dataset_table', source_project_dataset_table)
configuration = {
'extract': {
'sourceTable': {
'projectId': self.project_id,
'projectId': source_project,
'datasetId': source_dataset,
'tableId': source_table,
},
Expand All @@ -248,7 +245,11 @@ def run_extract(self, source_dataset_table, destination_cloud_storage_uris, comp

return self.run_with_configuration(configuration)

def run_copy(self, source_dataset_tables, destination_project_dataset_table, write_disposition='WRITE_EMPTY', create_disposition='CREATE_IF_NEEDED'):
def run_copy(self,
source_project_dataset_tables,
destination_project_dataset_table,
write_disposition='WRITE_EMPTY',
reate_disposition='CREATE_IF_NEEDED'):

This comment has been minimized.

Copy link
@WesleyBatista

WesleyBatista Mar 11, 2016

Contributor

typo here?

"""
Executes a BigQuery copy command to copy data from one BigQuery table
to another. See here:
Expand All @@ -257,10 +258,11 @@ def run_copy(self, source_dataset_tables, destination_project_dataset_table, wri
For more details about these parameters.
:param source_dataset_tables: One or more dotted <dataset>.<table>
:param source_project_dataset_tables: One or more dotted (<project>.)<dataset>.<table>
BigQuery tables to use as the source data. Use a list if there are
multiple source tables.
:type source_dataset_tables: list|string
If <project> is not included, project will be the project defined in the connection json.
:type source_project_dataset_tables: list|string
:param destination_project_dataset_table: The destination BigQuery
table. Format is: <project>.<dataset>.<table>
:type destination_project_dataset_table: string
Expand All @@ -269,19 +271,16 @@ def run_copy(self, source_dataset_tables, destination_project_dataset_table, wri
:param create_disposition: The create disposition if the table doesn't exist.
:type create_disposition: string
"""
source_dataset_tables = [source_dataset_tables] if not isinstance(source_dataset_tables, list) else source_dataset_tables
source_project_dataset_tables = []
source_project_dataset_tables = [source_project_dataset_tables] if not isinstance(source_project_dataset_tables, list) else source_project_dataset_tables

for source_dataset_table in source_dataset_tables:
assert '.' in source_dataset_table, \
'Expected source_dataset_table in the format of <dataset>.<table>. Got: {}'.format(source_dataset_table)

source_dataset, source_table = source_dataset_table.split('.', 1)
source_project_dataset_tables.append({
'projectId': self.project_id,
'datasetId': source_dataset,
'tableId': source_table
})
source_project_dataset_tables_fixup = []
for source_project_dataset_table in source_project_dataset_tables:
source_project, source_dataset, source_table = self._split_project_dataset_table_input('source_project_dataset_table', source_project_dataset_table)
source_project_dataset_tables_fixup.append({
'projectId': source_project,
'datasetId': source_dataset,
'tableId': source_table
})

assert 3 == len(destination_project_dataset_table.split('.')), \
'Expected destination_project_dataset_table in the format of <project>.<dataset>.<table>. Got: {}'.format(destination_project_dataset_table)
Expand All @@ -291,7 +290,7 @@ def run_copy(self, source_dataset_tables, destination_project_dataset_table, wri
'copy': {
'createDisposition': create_disposition,
'writeDisposition': write_disposition,
'sourceTables': source_project_dataset_tables,
'sourceTables': source_project_dataset_tables_fixup,
'destinationTable': {
'projectId': destination_project,
'datasetId': destination_dataset,
Expand All @@ -302,7 +301,14 @@ def run_copy(self, source_dataset_tables, destination_project_dataset_table, wri

return self.run_with_configuration(configuration)

def run_load(self, destination_dataset_table, schema_fields, source_uris, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', field_delimiter=','):
def run_load(self,
destination_project_dataset_table,
schema_fields, source_uris,
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
skip_leading_rows=0,
write_disposition='WRITE_EMPTY',
field_delimiter=','):
"""
Executes a BigQuery load command to load data from Google Cloud Storage
to BigQuery. See here:
Expand All @@ -311,8 +317,9 @@ def run_load(self, destination_dataset_table, schema_fields, source_uris, source
For more details about these parameters.
:param destination_dataset_table: The dotted <dataset>.<table> BigQuery table to load data into.
:type destination_dataset_table: string
:param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table> BigQuery table to load data into.
If <project> is not included, project will be the project defined in the connection json.
:type destination_project_dataset_table: string
:param schema_fields: The schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
:type schema_fields: list
Expand All @@ -331,16 +338,13 @@ def run_load(self, destination_dataset_table, schema_fields, source_uris, source
:param field_delimiter: The delimiter to use when loading from a CSV.
:type field_delimiter: string
"""
assert '.' in destination_dataset_table, \
'Expected destination_dataset_table in the format of <dataset>.<table>. Got: {}'.format(destination_dataset_table)

destination_dataset, destination_table = destination_dataset_table.split('.', 1)
destination_project, destination_dataset, destination_table = self._split_project_dataset_table_input('destination_project_dataset_table', destination_project_dataset_table)

configuration = {
'load': {
'createDisposition': create_disposition,
'destinationTable': {
'projectId': self.project_id,
'projectId': destination_project,
'datasetId': destination_dataset,
'tableId': destination_table,
},
Expand All @@ -359,6 +363,27 @@ def run_load(self, destination_dataset_table, schema_fields, source_uris, source

return self.run_with_configuration(configuration)

def _split_project_dataset_table_input(self, var_name, project_dataset_table):
"""
:param var_name: the name of the variable input, for logging and erroring purposes.
:type var_name: str
:param project_dataset_table: input string in (<project>.)<dataset>.<project> format.
if project is not included in the string, self.project_id will be returned in the tuple.
:type project_dataset_table: str
:return: (project, dataset, table) tuple
"""
table_split = project_dataset_table.split('.')
assert len(table_split) == 2 or len(table_split) == 3, \
'Expected {var} in the format of (<project.)<dataset>.<table>, got {input}'.format(var=var_name, input=project_dataset_table)

if len(table_split) == 2:
logging.info('project not included in {var}: {input}; using project "{project}"'.format(var=var_name, input=project_dataset_table, project=self.project_id))
dataset, table = table_split
return self.project_id, dataset, table
else:
project, dataset, table = table_split
return project, dataset, table

def run_with_configuration(self, configuration):
"""
Executes a BigQuery SQL query. See here:
Expand Down
19 changes: 10 additions & 9 deletions airflow/contrib/operators/bigquery_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ class BigQueryToBigQueryOperator(BaseOperator):
"""
Copy a BigQuery table to another BigQuery table.
"""
template_fields = ('source_dataset_tables','destination_project_dataset_table',)
template_fields = ('source_project_dataset_tables','destination_project_dataset_table',)
template_ext = ('.sql',)
ui_color = '#e6f0e4'

@apply_defaults
def __init__(
self,
source_dataset_tables,
source_project_dataset_tables,
destination_project_dataset_table,
write_disposition='WRITE_EMPTY',
create_disposition='CREATE_IF_NEEDED',
Expand All @@ -30,10 +30,11 @@ def __init__(
For more details about these parameters.
:param source_dataset_tables: One or more dotted <dataset>.<table>
BigQuery tables to use as the source data. Use a list if there are
multiple source tables.
:type source_dataset_tables: list|string
:param source_project_dataset_tables: One or more dotted (<project>.)<dataset>.<table>
BigQuery tables to use as the source data.
If <project> is not included, project will be the project defined in the connection json.
Use a list if there are multiple source tables.
:type source_project_dataset_tables: list|string
:param destination_project_dataset_table: The destination BigQuery
table. Format is: <project>.<dataset>.<table>
:type destination_project_dataset_table: string
Expand All @@ -48,20 +49,20 @@ def __init__(
:type delegate_to: string
"""
super(BigQueryToBigQueryOperator, self).__init__(*args, **kwargs)
self.source_dataset_tables = source_dataset_tables
self.source_project_dataset_tables = source_project_dataset_tables
self.destination_project_dataset_table = destination_project_dataset_table
self.write_disposition = write_disposition
self.create_disposition = create_disposition
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to

def execute(self, context):
logging.info('Executing copy of %s into: %s', self.source_dataset_tables, self.destination_project_dataset_table)
logging.info('Executing copy of %s into: %s', self.source_project_dataset_tables, self.destination_project_dataset_table)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_copy(
self.source_dataset_tables,
self.source_project_dataset_tables,
self.destination_project_dataset_table,
self.write_disposition,
self.create_disposition)
15 changes: 8 additions & 7 deletions airflow/contrib/operators/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ class BigQueryToCloudStorageOperator(BaseOperator):
"""
Transfers a BigQuery table to a Google Cloud Storage bucket.
"""
template_fields = ('source_dataset_table','destination_cloud_storage_uris',)
template_fields = ('source_project_dataset_table','destination_cloud_storage_uris',)
template_ext = ('.sql',)
ui_color = '#e4e6f0'

@apply_defaults
def __init__(
self,
source_dataset_table,
source_project_dataset_table,
destination_cloud_storage_uris,
compression='NONE',
export_format='CSV',
Expand All @@ -33,8 +33,9 @@ def __init__(
For more details about these parameters.
:param source_dataset_table: The dotted <dataset>.<table> BigQuery table to use as the source data.
:type source_dataset_table: string
:param source_project_dataset_table: The dotted (<project>.)<dataset>.<table> BigQuery table to use as the
source data. If <project> is not included, project will be the project defined in the connection json.
:type source_project_dataset_table: string
:param destination_cloud_storage_uris: The destination Google Cloud
Storage URI (e.g. gs://some-bucket/some-file.txt). Follows
convention defined here:
Expand All @@ -55,7 +56,7 @@ def __init__(
:type delegate_to: string
"""
super(BigQueryToCloudStorageOperator, self).__init__(*args, **kwargs)
self.source_dataset_table = source_dataset_table
self.source_project_dataset_table = source_project_dataset_table
self.destination_cloud_storage_uris = destination_cloud_storage_uris
self.compression = compression
self.export_format = export_format
Expand All @@ -65,12 +66,12 @@ def __init__(
self.delegate_to = delegate_to

def execute(self, context):
logging.info('Executing extract of %s into: %s', self.source_dataset_table, self.destination_cloud_storage_uris)
logging.info('Executing extract of %s into: %s', self.source_project_dataset_table, self.destination_cloud_storage_uris)
hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_extract(
self.source_dataset_table,
self.source_project_dataset_table,
self.destination_cloud_storage_uris,
self.compression,
self.export_format,
Expand Down
15 changes: 8 additions & 7 deletions airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(
self,
bucket,
source_objects,
destination_dataset_table,
destination_project_dataset_table,
schema_fields=False,
schema_object=False,
source_format='CSV',
Expand All @@ -43,8 +43,9 @@ def __init__(
:type bucket: string
:param source_objects: List of Google cloud storage URIs to load from.
:type object: list
:param destination_dataset_table: The dotted <dataset>.<table> BigQuery table to load data into.
:type destination_dataset_table: string
:param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table> BigQuery table to load data
into. If <project> is not included, project will be the project defined in the connection json.
:type destination_project_dataset_table: string
:param schema_fields: If set, the schema field list as defined here:
https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
:type schema_fields: list
Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(
self.schema_object = schema_object

# BQ config
self.destination_dataset_table = destination_dataset_table
self.destination_project_dataset_table = destination_project_dataset_table
self.schema_fields = schema_fields
self.source_format = source_format
self.create_disposition = create_disposition
Expand All @@ -109,7 +110,7 @@ def execute(self, context):
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.run_load(
destination_dataset_table=self.destination_dataset_table,
destination_project_dataset_table=self.destination_project_dataset_table,
schema_fields=schema_fields,
source_uris=source_uris,
source_format=self.source_format,
Expand All @@ -119,8 +120,8 @@ def execute(self, context):
field_delimiter=self.field_delimiter)

if self.max_id_key:
cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_dataset_table))
cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_project_dataset_table))
row = cursor.fetchone()
max_id = row[0] if row[0] else 0
logging.info('Loaded BQ data with max {}.{}={}'.format(self.destination_dataset_table, self.max_id_key, max_id))
logging.info('Loaded BQ data with max {}.{}={}'.format(self.destination_project_dataset_table, self.max_id_key, max_id))
return max_id

0 comments on commit ab73b70

Please sign in to comment.