Skip to content

Commit

Permalink
Fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
Boyuan Zhang committed Sep 18, 2018
1 parent 6682484 commit d237b79
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 48 deletions.
93 changes: 49 additions & 44 deletions sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,54 @@
"""
Integration test for Google Cloud BigQuery.
"""
# pylint: disable=wrong-import-order, wrong-import-position

from __future__ import absolute_import

import datetime
import logging
import unittest
import time
import unittest

from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr

from apache_beam.io.gcp import big_query_query_to_table_pipeline
from apache_beam.io.gcp.bigquery import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.testing import test_utils
from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher
from apache_beam.testing import test_utils
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline

# pylint: disable=wrong-import-order, wrong-import-position
try:
from apitools.base.py.exceptions import HttpError
except ImportError:
pass

BIG_QUERY_DATASET_ID = 'python_query_to_table_'
NEW_TYPES_INPUT_TABLE = 'python_new_types_table'
NEW_TYPES_OUTPUT_SCHEMA = (
'{"fields": [{"name": "bytes","type": "BYTES"},'
'{"name": "date","type": "DATE"},{"name": "time","type": "TIME"}]}')
'{"fields": [{"name": "bytes","type": "BYTES"},'
'{"name": "date","type": "DATE"},{"name": "time","type": "TIME"}]}')
NEW_TYPES_OUTPUT_VERIFY_QUERY = ('SELECT date FROM [%s];')
# There are problems with query time and bytes with current version of bigquery.
NEW_TYPES_OUTPUT_EXPECTED = [
(datetime.date(2000, 1, 1),),
(datetime.date(2011, 1, 1),),
(datetime.date(3000, 12, 31),)]
(datetime.date(2000, 1, 1),),
(datetime.date(2011, 1, 1),),
(datetime.date(3000, 12, 31),)]
LEGACY_QUERY = (
'SELECT * FROM (SELECT "apple" as fruit), (SELECT "orange" as fruit),')
'SELECT * FROM (SELECT "apple" as fruit), (SELECT "orange" as fruit),')
STANDARD_QUERY = (
'SELECT * FROM (SELECT "apple" as fruit) '
'UNION ALL (SELECT "orange" as fruit)')
'SELECT * FROM (SELECT "apple" as fruit) '
'UNION ALL (SELECT "orange" as fruit)')
NEW_TYPES_QUERY = (
'SELECT bytes, date, time FROM [%s.%s]')
'SELECT bytes, date, time FROM [%s.%s]')
DIALECT_OUTPUT_SCHEMA = ('{"fields": [{"name": "fruit","type": "STRING"}]}')
DIALECT_OUTPUT_VERIFY_QUERY = ('SELECT fruit from [%s];')
DIALECT_OUTPUT_EXPECTED = [(u'apple',), (u'orange',)]


class BigQueryQueryToTableIT(unittest.TestCase):
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
Expand All @@ -75,7 +82,7 @@ def tearDown(self):
deleteContents=True)
try:
self.bigquery_client.client.datasets.Delete(request)
except:
except HttpError:
logging.debug('Failed to clean up dataset %s' % self.dataset_id)

def _setup_new_types_env(self):
Expand All @@ -102,9 +109,9 @@ def _setup_new_types_env(self):
projectId=self.project, datasetId=self.dataset_id, table=table)
self.bigquery_client.client.tables.Insert(request)
table_data = [
{'bytes':b'xyw=', 'date':'2011-01-01', 'time':'23:59:59.999999'},
{'bytes':b'abc=', 'date':'2000-01-01', 'time':'00:00:00'},
{'bytes':b'dec=', 'date':'3000-12-31', 'time':'23:59:59.990000'}
{'bytes':b'xyw=', 'date':'2011-01-01', 'time':'23:59:59.999999'},
{'bytes':b'abc=', 'date':'2000-01-01', 'time':'00:00:00'},
{'bytes':b'dec=', 'date':'3000-12-31', 'time':'23:59:59.990000'}
]
self.bigquery_client.insert_rows(
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
Expand All @@ -113,55 +120,53 @@ def _setup_new_types_env(self):
def test_big_query_legacy_sql(self):
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
pipeline_verifiers = [PipelineStateMatcher(),
BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
extra_opts = {'query': LEGACY_QUERY,
'output': self.output_table,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
'output': self.output_table,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)

@attr('IT')
def test_big_query_standard_sql(self):
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
pipeline_verifiers = [PipelineStateMatcher(),
BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
extra_opts = {'query': STANDARD_QUERY,
'output': self.output_table,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': True,
'on_success_matcher': all_of(*pipeline_verifiers)}
'output': self.output_table,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': True,
'on_success_matcher': all_of(*pipeline_verifiers)}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)

@attr('IT')
def test_big_query_new_types(self):
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
pipeline_verifiers = [PipelineStateMatcher(),
BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
pipeline_verifiers = [PipelineStateMatcher(), BigqueryMatcher(
project=self.project,
query=verify_query,
checksum=expected_checksum)]
self._setup_new_types_env()
extra_opts = {
'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
'output': self.output_table,
'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
'output': self.output_table,
'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
big query table at the end of the pipeline.
"""

# pylint: disable=wrong-import-order, wrong-import-position
from __future__ import absolute_import

import argparse

import apache_beam as beam
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline


def run_bq_pipeline(argv=None):
Expand All @@ -54,12 +55,14 @@ def run_bq_pipeline(argv=None):
p = TestPipeline(options=PipelineOptions(pipeline_args))

# pylint: disable=expression-not-assigned
# pylint: disable=bad-continuation
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
query=known_args.query, use_standard_sql=known_args.use_standard_sql))
| 'write' >> beam.io.Write(beam.io.BigQuerySink(
known_args.output, schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
known_args.output,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))

result = p.run()
result.wait_until_finish()

0 comments on commit d237b79

Please sign in to comment.