diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py new file mode 100644 index 0000000000000..c0962c73e658d --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py @@ -0,0 +1,172 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Integration test for Google Cloud BigQuery. +""" + +from __future__ import absolute_import + +import datetime +import logging +import time +import unittest + +from hamcrest.core.core.allof import all_of +from nose.plugins.attrib import attr + +from apache_beam.io.gcp import big_query_query_to_table_pipeline +from apache_beam.io.gcp.bigquery import BigQueryWrapper +from apache_beam.io.gcp.internal.clients import bigquery +from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher +from apache_beam.testing import test_utils +from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher +from apache_beam.testing.test_pipeline import TestPipeline + +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + pass + +BIG_QUERY_DATASET_ID = 'python_query_to_table_' +NEW_TYPES_INPUT_TABLE = 'python_new_types_table' +NEW_TYPES_OUTPUT_SCHEMA = ( + '{"fields": [{"name": "bytes","type": "BYTES"},' + '{"name": "date","type": "DATE"},{"name": "time","type": "TIME"}]}') +NEW_TYPES_OUTPUT_VERIFY_QUERY = ('SELECT date FROM [%s];') +# There are problems with query time and bytes with current version of bigquery. +NEW_TYPES_OUTPUT_EXPECTED = [ + (datetime.date(2000, 1, 1),), + (datetime.date(2011, 1, 1),), + (datetime.date(3000, 12, 31),)] +LEGACY_QUERY = ( + 'SELECT * FROM (SELECT "apple" as fruit), (SELECT "orange" as fruit),') +STANDARD_QUERY = ( + 'SELECT * FROM (SELECT "apple" as fruit) ' + 'UNION ALL (SELECT "orange" as fruit)') +NEW_TYPES_QUERY = ( + 'SELECT bytes, date, time FROM [%s.%s]') +DIALECT_OUTPUT_SCHEMA = ('{"fields": [{"name": "fruit","type": "STRING"}]}') +DIALECT_OUTPUT_VERIFY_QUERY = ('SELECT fruit from [%s];') +DIALECT_OUTPUT_EXPECTED = [(u'apple',), (u'orange',)] + + +class BigQueryQueryToTableIT(unittest.TestCase): + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + self.runner_name = type(self.test_pipeline.runner).__name__ + self.project = self.test_pipeline.get_option('project') + + self.bigquery_client = BigQueryWrapper() + self.dataset_id = BIG_QUERY_DATASET_ID + str(int(time.time())) + self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) + self.output_table = "%s.output_table" % (self.dataset_id) + + def tearDown(self): + request = bigquery.BigqueryDatasetsDeleteRequest( + projectId=self.project, datasetId=self.dataset_id, + deleteContents=True) + try: + self.bigquery_client.client.datasets.Delete(request) + except HttpError: + logging.debug('Failed to clean up dataset %s' % self.dataset_id) + + def _setup_new_types_env(self): + table_schema = bigquery.TableSchema() + table_field = bigquery.TableFieldSchema() + table_field.name = 'bytes' + table_field.type = 'BYTES' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() + table_field.name = 'date' + table_field.type = 'DATE' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() + table_field.name = 'time' + table_field.type = 'TIME' + table_schema.fields.append(table_field) + table = bigquery.Table( + tableReference=bigquery.TableReference( + projectId=self.project, + datasetId=self.dataset_id, + tableId=NEW_TYPES_INPUT_TABLE), + schema=table_schema) + request = bigquery.BigqueryTablesInsertRequest( + projectId=self.project, datasetId=self.dataset_id, table=table) + self.bigquery_client.client.tables.Insert(request) + table_data = [ + {'bytes':b'xyw=', 'date':'2011-01-01', 'time':'23:59:59.999999'}, + {'bytes':b'abc=', 'date':'2000-01-01', 'time':'00:00:00'}, + {'bytes':b'dec=', 'date':'3000-12-31', 'time':'23:59:59.990000'} + ] + self.bigquery_client.insert_rows( + self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data) + + @attr('IT') + def test_big_query_legacy_sql(self): + verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table + expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED) + pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( + project=self.project, + query=verify_query, + checksum=expected_checksum)] + extra_opts = {'query': LEGACY_QUERY, + 'output': self.output_table, + 'output_schema': DIALECT_OUTPUT_SCHEMA, + 'use_standard_sql': False, + 'on_success_matcher': all_of(*pipeline_verifiers)} + options = self.test_pipeline.get_full_options_as_args(**extra_opts) + big_query_query_to_table_pipeline.run_bq_pipeline(options) + + @attr('IT') + def test_big_query_standard_sql(self): + verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table + expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED) + pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( + project=self.project, + query=verify_query, + checksum=expected_checksum)] + extra_opts = {'query': STANDARD_QUERY, + 'output': self.output_table, + 'output_schema': DIALECT_OUTPUT_SCHEMA, + 'use_standard_sql': True, + 'on_success_matcher': all_of(*pipeline_verifiers)} + options = self.test_pipeline.get_full_options_as_args(**extra_opts) + big_query_query_to_table_pipeline.run_bq_pipeline(options) + + @attr('IT') + def test_big_query_new_types(self): + expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED) + verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table + pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher( + project=self.project, + query=verify_query, + checksum=expected_checksum)] + self._setup_new_types_env() + extra_opts = { + 'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE), + 'output': self.output_table, + 'output_schema': NEW_TYPES_OUTPUT_SCHEMA, + 'use_standard_sql': False, + 'on_success_matcher': all_of(*pipeline_verifiers)} + options = self.test_pipeline.get_full_options_as_args(**extra_opts) + big_query_query_to_table_pipeline.run_bq_pipeline(options) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.DEBUG) + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py new file mode 100644 index 0000000000000..65a4941f59ab5 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Job that reads and writes data to BigQuery. + +A Dataflow job that reads from BQ using a query and then writes to a +big query table at the end of the pipeline. +""" + +# pylint: disable=wrong-import-order, wrong-import-position +from __future__ import absolute_import + +import argparse + +import apache_beam as beam +from apache_beam.io.gcp.bigquery import parse_table_schema_from_json +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.testing.test_pipeline import TestPipeline + + +def run_bq_pipeline(argv=None): + """Run the sample BigQuery pipeline. + + Args: + argv: Arguments to the run function. + """ + parser = argparse.ArgumentParser() + parser.add_argument('--query', required=True, + help='Query to process for the table.') + parser.add_argument('--output', required=True, + help='Output BQ table to write results to.') + parser.add_argument('--output_schema', dest='output_schema', required=True, + help='Schema for output BQ table.') + parser.add_argument('--use_standard_sql', action='store_true', + dest='use_standard_sql', + help='Output BQ table to write results to.') + known_args, pipeline_args = parser.parse_known_args(argv) + + table_schema = parse_table_schema_from_json(known_args.output_schema) + + p = TestPipeline(options=PipelineOptions(pipeline_args)) + + # pylint: disable=expression-not-assigned + # pylint: disable=bad-continuation + (p | 'read' >> beam.io.Read(beam.io.BigQuerySource( + query=known_args.query, use_standard_sql=known_args.use_standard_sql)) + | 'write' >> beam.io.Write(beam.io.BigQuerySink( + known_args.output, + schema=table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) + + result = p.run() + result.wait_until_finish() diff --git a/sdks/python/scripts/run_postcommit.sh b/sdks/python/scripts/run_postcommit.sh index a228c7f2b588a..d0486c8add7c2 100755 --- a/sdks/python/scripts/run_postcommit.sh +++ b/sdks/python/scripts/run_postcommit.sh @@ -97,9 +97,16 @@ fi TESTS="" if [[ "$3" = "TestDirectRunner" ]]; then - TESTS="--tests=\ -apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it,\ -apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest" + if [[ "$2" = "streaming" ]]; then + TESTS="--tests=\ + apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it,\ + apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest" + else + TESTS="--tests=\ + apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it,\ + apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest,\ + apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT" + fi fi ###########################################################################