Skip to content

Commit

Permalink
Add big_query_query_to_table_it to python SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
Boyuan Zhang committed Sep 18, 2018
1 parent 8981a82 commit 092f7d9
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 3 deletions.
172 changes: 172 additions & 0 deletions sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Integration test for Google Cloud BigQuery.
"""

from __future__ import absolute_import

import datetime
import logging
import time
import unittest

from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr

from apache_beam.io.gcp import big_query_query_to_table_pipeline
from apache_beam.io.gcp.bigquery import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline

# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
pass

BIG_QUERY_DATASET_ID = 'python_query_to_table_'
NEW_TYPES_INPUT_TABLE = 'python_new_types_table'
NEW_TYPES_OUTPUT_SCHEMA = (
'{"fields": [{"name": "bytes","type": "BYTES"},'
'{"name": "date","type": "DATE"},{"name": "time","type": "TIME"}]}')
NEW_TYPES_OUTPUT_VERIFY_QUERY = ('SELECT date FROM [%s];')
# There are problems with query time and bytes with current version of bigquery.
NEW_TYPES_OUTPUT_EXPECTED = [
(datetime.date(2000, 1, 1),),
(datetime.date(2011, 1, 1),),
(datetime.date(3000, 12, 31),)]
LEGACY_QUERY = (
'SELECT * FROM (SELECT "apple" as fruit), (SELECT "orange" as fruit),')
STANDARD_QUERY = (
'SELECT * FROM (SELECT "apple" as fruit) '
'UNION ALL (SELECT "orange" as fruit)')
NEW_TYPES_QUERY = (
'SELECT bytes, date, time FROM [%s.%s]')
DIALECT_OUTPUT_SCHEMA = ('{"fields": [{"name": "fruit","type": "STRING"}]}')
DIALECT_OUTPUT_VERIFY_QUERY = ('SELECT fruit from [%s];')
DIALECT_OUTPUT_EXPECTED = [(u'apple',), (u'orange',)]


class BigQueryQueryToTableIT(unittest.TestCase):
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')

self.bigquery_client = BigQueryWrapper()
self.dataset_id = BIG_QUERY_DATASET_ID + str(int(time.time()))
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
self.output_table = "%s.output_table" % (self.dataset_id)

def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
logging.debug('Failed to clean up dataset %s' % self.dataset_id)

def _setup_new_types_env(self):
table_schema = bigquery.TableSchema()
table_field = bigquery.TableFieldSchema()
table_field.name = 'bytes'
table_field.type = 'BYTES'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'date'
table_field.type = 'DATE'
table_schema.fields.append(table_field)
table_field = bigquery.TableFieldSchema()
table_field.name = 'time'
table_field.type = 'TIME'
table_schema.fields.append(table_field)
table = bigquery.Table(
tableReference=bigquery.TableReference(
projectId=self.project,
datasetId=self.dataset_id,
tableId=NEW_TYPES_INPUT_TABLE),
schema=table_schema)
request = bigquery.BigqueryTablesInsertRequest(
projectId=self.project, datasetId=self.dataset_id, table=table)
self.bigquery_client.client.tables.Insert(request)
table_data = [
{'bytes':b'xyw=', 'date':'2011-01-01', 'time':'23:59:59.999999'},
{'bytes':b'abc=', 'date':'2000-01-01', 'time':'00:00:00'},
{'bytes':b'dec=', 'date':'3000-12-31', 'time':'23:59:59.990000'}
]
self.bigquery_client.insert_rows(
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)

@attr('IT')
def test_big_query_legacy_sql(self):
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
extra_opts = {'query': LEGACY_QUERY,
'output': self.output_table,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)

@attr('IT')
def test_big_query_standard_sql(self):
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
extra_opts = {'query': STANDARD_QUERY,
'output': self.output_table,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': True,
'on_success_matcher': all_of(*pipeline_verifiers)}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)

@attr('IT')
def test_big_query_new_types(self):
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
self._setup_new_types_env()
extra_opts = {
'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
'output': self.output_table,
'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Job that reads and writes data to BigQuery.
A Dataflow job that reads from BQ using a query and then writes to a
big query table at the end of the pipeline.
"""

# pylint: disable=wrong-import-order, wrong-import-position
from __future__ import absolute_import

import argparse

import apache_beam as beam
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline


def run_bq_pipeline(argv=None):
"""Run the sample BigQuery pipeline.
Args:
argv: Arguments to the run function.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--query', required=True,
help='Query to process for the table.')
parser.add_argument('--output', required=True,
help='Output BQ table to write results to.')
parser.add_argument('--output_schema', dest='output_schema', required=True,
help='Schema for output BQ table.')
parser.add_argument('--use_standard_sql', action='store_true',
dest='use_standard_sql',
help='Output BQ table to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)

table_schema = parse_table_schema_from_json(known_args.output_schema)

p = TestPipeline(options=PipelineOptions(pipeline_args))

# pylint: disable=expression-not-assigned
# pylint: disable=bad-continuation
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
query=known_args.query, use_standard_sql=known_args.use_standard_sql))
| 'write' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

result = p.run()
result.wait_until_finish()
13 changes: 10 additions & 3 deletions sdks/python/scripts/run_postcommit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,16 @@ fi

TESTS=""
if [[ "$3" = "TestDirectRunner" ]]; then
TESTS="--tests=\
apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it,\
apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest"
if [[ "$2" = "streaming" ]]; then
TESTS="--tests=\
apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it,\
apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest"
else
TESTS="--tests=\
apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it,\
apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest,\
apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT"
fi
fi

###########################################################################
Expand Down

0 comments on commit 092f7d9

Please sign in to comment.