-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add big_query_query_to_table_it to python SDK
- Loading branch information
Boyuan Zhang
committed
Sep 18, 2018
1 parent
8981a82
commit e8c3f04
Showing
3 changed files
with
250 additions
and
1 deletion.
There are no files selected for viewing
172 changes: 172 additions & 0 deletions
172
sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
""" | ||
Integration test for Google Cloud BigQuery. | ||
""" | ||
|
||
from __future__ import absolute_import | ||
|
||
import datetime | ||
import logging | ||
import time | ||
import unittest | ||
|
||
from hamcrest.core.core.allof import all_of | ||
from nose.plugins.attrib import attr | ||
|
||
from apache_beam.io.gcp import big_query_query_to_table_pipeline | ||
from apache_beam.io.gcp.bigquery import BigQueryWrapper | ||
from apache_beam.io.gcp.internal.clients import bigquery | ||
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher | ||
from apache_beam.testing import test_utils | ||
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher | ||
from apache_beam.testing.test_pipeline import TestPipeline | ||
|
||
# pylint: disable=wrong-import-order, wrong-import-position | ||
try: | ||
from apitools.base.py.exceptions import HttpError | ||
except ImportError: | ||
pass | ||
|
||
BIG_QUERY_DATASET_ID = 'python_query_to_table_' | ||
NEW_TYPES_INPUT_TABLE = 'python_new_types_table' | ||
NEW_TYPES_OUTPUT_SCHEMA = ( | ||
'{"fields": [{"name": "bytes","type": "BYTES"},' | ||
'{"name": "date","type": "DATE"},{"name": "time","type": "TIME"}]}') | ||
NEW_TYPES_OUTPUT_VERIFY_QUERY = ('SELECT date FROM [%s];') | ||
# There are problems with query time and bytes with current version of bigquery. | ||
NEW_TYPES_OUTPUT_EXPECTED = [ | ||
(datetime.date(2000, 1, 1),), | ||
(datetime.date(2011, 1, 1),), | ||
(datetime.date(3000, 12, 31),)] | ||
LEGACY_QUERY = ( | ||
'SELECT * FROM (SELECT "apple" as fruit), (SELECT "orange" as fruit),') | ||
STANDARD_QUERY = ( | ||
'SELECT * FROM (SELECT "apple" as fruit) ' | ||
'UNION ALL (SELECT "orange" as fruit)') | ||
NEW_TYPES_QUERY = ( | ||
'SELECT bytes, date, time FROM [%s.%s]') | ||
DIALECT_OUTPUT_SCHEMA = ('{"fields": [{"name": "fruit","type": "STRING"}]}') | ||
DIALECT_OUTPUT_VERIFY_QUERY = ('SELECT fruit from [%s];') | ||
DIALECT_OUTPUT_EXPECTED = [(u'apple',), (u'orange',)] | ||
|
||
|
||
class BigQueryQueryToTableIT(unittest.TestCase): | ||
def setUp(self): | ||
self.test_pipeline = TestPipeline(is_integration_test=True) | ||
self.runner_name = type(self.test_pipeline.runner).__name__ | ||
self.project = self.test_pipeline.get_option('project') | ||
|
||
self.bigquery_client = BigQueryWrapper() | ||
self.dataset_id = BIG_QUERY_DATASET_ID + str(int(time.time())) | ||
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) | ||
self.output_table = "%s.output_table" % (self.dataset_id) | ||
|
||
def tearDown(self): | ||
request = bigquery.BigqueryDatasetsDeleteRequest( | ||
projectId=self.project, datasetId=self.dataset_id, | ||
deleteContents=True) | ||
try: | ||
self.bigquery_client.client.datasets.Delete(request) | ||
except HttpError: | ||
logging.debug('Failed to clean up dataset %s' % self.dataset_id) | ||
|
||
def _setup_new_types_env(self): | ||
table_schema = bigquery.TableSchema() | ||
table_field = bigquery.TableFieldSchema() | ||
table_field.name = 'bytes' | ||
table_field.type = 'BYTES' | ||
table_schema.fields.append(table_field) | ||
table_field = bigquery.TableFieldSchema() | ||
table_field.name = 'date' | ||
table_field.type = 'DATE' | ||
table_schema.fields.append(table_field) | ||
table_field = bigquery.TableFieldSchema() | ||
table_field.name = 'time' | ||
table_field.type = 'TIME' | ||
table_schema.fields.append(table_field) | ||
table = bigquery.Table( | ||
tableReference=bigquery.TableReference( | ||
projectId=self.project, | ||
datasetId=self.dataset_id, | ||
tableId=NEW_TYPES_INPUT_TABLE), | ||
schema=table_schema) | ||
request = bigquery.BigqueryTablesInsertRequest( | ||
projectId=self.project, datasetId=self.dataset_id, table=table) | ||
self.bigquery_client.client.tables.Insert(request) | ||
table_data = [ | ||
{'bytes':b'xyw=', 'date':'2011-01-01', 'time':'23:59:59.999999'}, | ||
{'bytes':b'abc=', 'date':'2000-01-01', 'time':'00:00:00'}, | ||
{'bytes':b'dec=', 'date':'3000-12-31', 'time':'23:59:59.990000'} | ||
] | ||
self.bigquery_client.insert_rows( | ||
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data) | ||
|
||
@attr('IT') | ||
def test_big_query_legacy_sql(self): | ||
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table | ||
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED) | ||
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( | ||
project=self.project, | ||
query=verify_query, | ||
checksum=expected_checksum)] | ||
extra_opts = {'query': LEGACY_QUERY, | ||
'output': self.output_table, | ||
'output_schema': DIALECT_OUTPUT_SCHEMA, | ||
'use_standard_sql': False, | ||
'on_success_matcher': all_of(*pipeline_verifiers)} | ||
options = self.test_pipeline.get_full_options_as_args(**extra_opts) | ||
big_query_query_to_table_pipeline.run_bq_pipeline(options) | ||
|
||
@attr('IT') | ||
def test_big_query_standard_sql(self): | ||
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table | ||
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED) | ||
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( | ||
project=self.project, | ||
query=verify_query, | ||
checksum=expected_checksum)] | ||
extra_opts = {'query': STANDARD_QUERY, | ||
'output': self.output_table, | ||
'output_schema': DIALECT_OUTPUT_SCHEMA, | ||
'use_standard_sql': True, | ||
'on_success_matcher': all_of(*pipeline_verifiers)} | ||
options = self.test_pipeline.get_full_options_as_args(**extra_opts) | ||
big_query_query_to_table_pipeline.run_bq_pipeline(options) | ||
|
||
@attr('IT') | ||
def test_big_query_new_types(self): | ||
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED) | ||
verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table | ||
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( | ||
project=self.project, | ||
query=verify_query, | ||
checksum=expected_checksum)] | ||
self._setup_new_types_env() | ||
extra_opts = { | ||
'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE), | ||
'output': self.output_table, | ||
'output_schema': NEW_TYPES_OUTPUT_SCHEMA, | ||
'use_standard_sql': False, | ||
'on_success_matcher': all_of(*pipeline_verifiers)} | ||
options = self.test_pipeline.get_full_options_as_args(**extra_opts) | ||
big_query_query_to_table_pipeline.run_bq_pipeline(options) | ||
|
||
|
||
if __name__ == '__main__': | ||
logging.getLogger().setLevel(logging.DEBUG) | ||
unittest.main() |
68 changes: 68 additions & 0 deletions
68
sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
"""Job that reads and writes data to BigQuery. | ||
A Dataflow job that reads from BQ using a query and then writes to a | ||
big query table at the end of the pipeline. | ||
""" | ||
|
||
# pylint: disable=wrong-import-order, wrong-import-position | ||
from __future__ import absolute_import | ||
|
||
import argparse | ||
|
||
import apache_beam as beam | ||
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json | ||
from apache_beam.options.pipeline_options import PipelineOptions | ||
from apache_beam.testing.test_pipeline import TestPipeline | ||
|
||
|
||
def run_bq_pipeline(argv=None): | ||
"""Run the sample BigQuery pipeline. | ||
Args: | ||
argv: Arguments to the run function. | ||
""" | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument('--query', required=True, | ||
help='Query to process for the table.') | ||
parser.add_argument('--output', required=True, | ||
help='Output BQ table to write results to.') | ||
parser.add_argument('--output_schema', dest='output_schema', required=True, | ||
help='Schema for output BQ table.') | ||
parser.add_argument('--use_standard_sql', action='store_true', | ||
dest='use_standard_sql', | ||
help='Output BQ table to write results to.') | ||
known_args, pipeline_args = parser.parse_known_args(argv) | ||
|
||
table_schema = parse_table_schema_from_json(known_args.output_schema) | ||
|
||
p = TestPipeline(options=PipelineOptions(pipeline_args)) | ||
|
||
# pylint: disable=expression-not-assigned | ||
# pylint: disable=bad-continuation | ||
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource( | ||
query=known_args.query, use_standard_sql=known_args.use_standard_sql)) | ||
| 'write' >> beam.io.Write(beam.io.BigQuerySink( | ||
known_args.output, | ||
schema=table_schema, | ||
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, | ||
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) | ||
|
||
result = p.run() | ||
result.wait_until_finish() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters