diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 0905ba764deb..7d5dd876bda1 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1029,6 +1029,16 @@ def __init__( self._step_name = step_name self._source_uuid = unique_id + def _get_project(self): + """Returns the project that queries and exports will be billed to.""" + if self.pipeline_options: + project = self.pipeline_options.view_as(GoogleCloudOptions).project + if isinstance(project, vp.ValueProvider): + project = project.get() + if project: + return project + return self.project + def _get_parent_project(self): """Returns the project that will be billed.""" if self.temp_table: @@ -1164,6 +1174,9 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self._setup_temporary_dataset(bq) self.table_reference = self._execute_query(bq) + if not self.table_reference.projectId: + self.table_reference.projectId = self._get_project() + requested_session = bq_storage.types.ReadSession() requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format( self.table_reference.projectId, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py new file mode 100644 index 000000000000..5a506d3162f9 --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -0,0 +1,540 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for BigQuery GEOGRAPHY data type support.""" + +import logging +import secrets +import time +import unittest + +import hamcrest as hc +import pytest + +import apache_beam as beam +from apache_beam.io.gcp.bigquery import ReadFromBigQuery +from apache_beam.io.gcp.bigquery import WriteToBigQuery +from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper +from apache_beam.io.gcp.internal.clients import bigquery +from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None + +_LOGGER = logging.getLogger(__name__) + + +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class BigQueryGeographyIntegrationTests(unittest.TestCase): + """Integration tests for BigQuery GEOGRAPHY data type.""" + + BIG_QUERY_DATASET_ID = 'python_geography_it_test_' + + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + self.runner_name = type(self.test_pipeline.runner).__name__ + self.project = self.test_pipeline.get_option('project') + + self.bigquery_client = BigQueryWrapper() + self.dataset_id = '%s%d%s' % ( + self.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3)) + self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) + _LOGGER.info( + "Created dataset %s in project %s", self.dataset_id, self.project) + + def tearDown(self): + request = bigquery.BigqueryDatasetsDeleteRequest( + projectId=self.project, datasetId=self.dataset_id, deleteContents=True) + try: + _LOGGER.info( + "Deleting dataset %s in project %s", self.dataset_id, self.project) + self.bigquery_client.client.datasets.Delete(request) + except HttpError: + _LOGGER.debug( + 'Failed to clean up dataset %s in project %s', + self.dataset_id, + self.project) + + def create_geography_table(self, table_name, include_repeated=False): + """Create a table with various GEOGRAPHY field configurations.""" + table_schema = bigquery.TableSchema() + + # ID field + id_field = bigquery.TableFieldSchema() + id_field.name = 'id' + id_field.type = 'INTEGER' + id_field.mode = 'REQUIRED' + table_schema.fields.append(id_field) + + # Required GEOGRAPHY field + geo_required = bigquery.TableFieldSchema() + geo_required.name = 'location' + geo_required.type = 'GEOGRAPHY' + geo_required.mode = 'REQUIRED' + table_schema.fields.append(geo_required) + + # Nullable GEOGRAPHY field + geo_nullable = bigquery.TableFieldSchema() + geo_nullable.name = 'optional_location' + geo_nullable.type = 'GEOGRAPHY' + geo_nullable.mode = 'NULLABLE' + table_schema.fields.append(geo_nullable) + + if include_repeated: + # Repeated GEOGRAPHY field + geo_repeated = bigquery.TableFieldSchema() + geo_repeated.name = 'path' + geo_repeated.type = 'GEOGRAPHY' + geo_repeated.mode = 'REPEATED' + table_schema.fields.append(geo_repeated) + + table = bigquery.Table( + tableReference=bigquery.TableReference( + projectId=self.project, + datasetId=self.dataset_id, + tableId=table_name), + schema=table_schema) + request = bigquery.BigqueryTablesInsertRequest( + projectId=self.project, datasetId=self.dataset_id, table=table) + self.bigquery_client.client.tables.Insert(request) + + # Wait for table to be available + _ = self.bigquery_client.get_table( + self.project, self.dataset_id, table_name) + + @pytest.mark.it_postcommit + def test_geography_write_and_read_basic_geometries(self): + """Test writing and reading basic GEOGRAPHY geometries.""" + table_name = 'geography_basic_geometries' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + # Test data with various WKT geometry types + input_data = [ + { + 'id': 1, + 'location': 'POINT(30 10)', + 'optional_location': ('POINT(-122.4194 37.7749)') # San Francisco + }, + { + 'id': 2, + 'location': 'LINESTRING(30 10, 10 30, 40 40)', + 'optional_location': None + }, + { + 'id': 3, + 'location': ('POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))'), + 'optional_location': ('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))') + }, + { + 'id': 4, + 'location': ('MULTIPOINT((10 40), (40 30), (20 20), (30 10))'), + 'optional_location': 'POINT(0 0)' + }, + { + 'id': 5, + 'location': ( + 'MULTILINESTRING((10 10, 20 20, 10 40), ' + '(40 40, 30 30, 40 20, 30 10))'), + 'optional_location': None + } + ] + + table_schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] + } + + # Write data to BigQuery + with TestPipeline(is_integration_test=True) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STREAMING_INSERTS, + project=self.project)) + + # Read data back and verify + with TestPipeline(is_integration_test=True) as p: + result = ( + p + | 'ReadFromBQ' >> ReadFromBigQuery( + table=table_id, + project=self.project, + method=ReadFromBigQuery.Method.DIRECT_READ) + | 'ExtractGeography' >> beam.Map( + lambda row: + (row['id'], row['location'], row['optional_location']))) + + expected_data = [ + (1, 'POINT(30 10)', 'POINT(-122.4194 37.7749)'), + (2, 'LINESTRING(30 10, 10 30, 40 40)', None), + ( + 3, + 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', + 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), + (4, 'MULTIPOINT(20 20, 10 40, 40 30, 30 10)', 'POINT(0 0)'), + ( + 5, + 'MULTILINESTRING((10 10, 20 20, 10 40), ' + '(40 40, 30 30, 40 20, 30 10))', + None) + ] + + assert_that(result, equal_to(expected_data)) + + @pytest.mark.it_postcommit + def test_geography_write_with_beam_rows(self): + """Test writing GEOGRAPHY data using Beam Rows with GeographyType.""" + table_name = 'geography_beam_rows' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + # Create the table first + self.create_geography_table(table_name) + + # Create Beam Rows with GeographyType + row_elements = [ + beam.Row(id=1, location='POINT(1 1)', optional_location='POINT(2 2)'), + beam.Row( + id=2, location='LINESTRING(0 0, 1 1, 2 2)', optional_location=None), + beam.Row( + id=3, + location='POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + optional_location='POINT(3 3)') + ] + + # Expected data for verification + expected_data = [(1, 'POINT(1 1)', 'POINT(2 2)'), + (2, 'LINESTRING(0 0, 1 1, 2 2)', None), + (3, 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', 'POINT(3 3)')] + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query=( + "SELECT id, location, optional_location FROM %s ORDER BY id" % + table_id), + data=expected_data) + ] + + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateRows' >> beam.Create(row_elements) + | 'ConvertToDict' >> beam.Map( + lambda row: { + 'id': row.id, 'location': row.location, + 'optional_location': row.optional_location + }) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + method=WriteToBigQuery.Method.STREAMING_INSERTS, + schema={ + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, + { + "name": "location", + "type": "GEOGRAPHY", + "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] + })) + + # Wait a bit for streaming inserts to complete + time.sleep(5) + + # Verify the data was written correctly + hc.assert_that(None, hc.all_of(*pipeline_verifiers)) + + @pytest.mark.it_postcommit + def test_geography_repeated_fields(self): + """Test GEOGRAPHY fields with REPEATED mode.""" + table_name = 'geography_repeated' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [ + { + 'id': 1, + 'location': 'POINT(0 0)', + 'optional_location': 'POINT(1 1)', + 'path': ['POINT(0 0)', 'POINT(1 1)', 'POINT(2 2)'] + }, + { + 'id': 2, + 'location': 'POINT(10 10)', + 'optional_location': None, + 'path': ['LINESTRING(0 0, 5 5)', 'LINESTRING(5 5, 10 10)'] + }, + { + 'id': 3, + 'location': 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + 'optional_location': 'POINT(0.5 0.5)', + 'path': [] # Empty array + } + ] + + table_schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }, { + "name": "path", "type": "GEOGRAPHY", "mode": "REPEATED" + }] + } + + # Write data + args = self.test_pipeline.get_full_options_as_args() + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STREAMING_INSERTS)) + + # Read and verify + with beam.Pipeline(argv=args) as p: + result = ( + p + | 'ReadFromBQ' >> ReadFromBigQuery( + table=table_id, + method=ReadFromBigQuery.Method.DIRECT_READ, + project=self.project) + | 'ExtractData' >> beam.Map( + lambda row: (row['id'], len(row['path']) if row['path'] else 0))) + + expected_counts = [(1, 3), (2, 2), (3, 0)] + assert_that(result, equal_to(expected_counts)) + + @pytest.mark.it_postcommit + def test_geography_complex_geometries(self): + """Test complex GEOGRAPHY geometries and edge cases.""" + table_name = 'geography_complex' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + # Complex geometries including collections and high precision coordinates + input_data = [ + { + 'id': 1, + 'location': ( + 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))'), + 'optional_location': None + }, + { + 'id': 2, + 'location': ( + 'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ' + '((2 2, 3 2, 3 3, 2 3, 2 2)))'), # Fixed orientation + 'optional_location': ('POINT(-122.419416 37.774929)' + ) # High precision + }, + { + 'id': 3, + 'location': ('POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))' + ), # Simple polygon without holes + 'optional_location': ('LINESTRING(-122 37, -121 38)' + ) # Fixed non-antipodal coordinates + } + ] + + table_schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] + } + + expected_data = [(1, 'LINESTRING(4 6, 7 10)', None), + ( + 2, + 'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ' + '((2 2, 3 2, 3 3, 2 3, 2 2)))', + 'POINT(-122.419416 37.774929)'), + ( + 3, + 'POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))', + 'LINESTRING(-122 37, -121 38)')] + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query=( + "SELECT id, location, optional_location FROM %s ORDER BY id" % + table_id), + data=expected_data) + ] + + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STREAMING_INSERTS)) + + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + + @pytest.mark.uses_gcp_java_expansion_service + @pytest.mark.it_postcommit + def test_geography_storage_write_api(self): + """Test GEOGRAPHY with Storage Write API method.""" + table_name = 'geography_storage_write' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [{ + 'id': 1, 'location': 'POINT(0 0)', 'optional_location': 'POINT(1 1)' + }, + { + 'id': 2, + 'location': 'LINESTRING(0 0, 1 1)', + 'optional_location': None + }] + + table_schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] + } + + expected_data = [(1, 'POINT(0 0)', 'POINT(1 1)'), + (2, 'LINESTRING(0 0, 1 1)', None)] + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query=( + "SELECT id, location, optional_location FROM %s ORDER BY id" % + table_id), + data=expected_data) + ] + + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STORAGE_WRITE_API)) + + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + + @pytest.mark.it_postcommit + def test_geography_file_loads_method(self): + """Test GEOGRAPHY with FILE_LOADS method.""" + table_name = 'geography_file_loads' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [ + { + 'id': i, + 'location': f'POINT({i} {i})', + 'optional_location': ( + f'POINT({i+10} {i+10})' if i % 2 == 0 else None) + } for i in range(1, 11) # 10 records + ] + + table_schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] + } + + # Verify count and some sample data + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT COUNT(*) as count FROM %s" % table_id, + data=[(10, )]) + ] + + args = self.test_pipeline.get_full_options_as_args() + gcs_temp_location = ( + f'gs://temp-storage-for-end-to-end-tests/' + f'bq_it_test_{int(time.time())}') + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.FILE_LOADS, + custom_gcs_temp_location=gcs_temp_location)) + + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index beb373a7dea3..54c7ca90f011 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -47,7 +47,8 @@ "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, - "TIMESTAMP": apache_beam.utils.timestamp.Timestamp + "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, + "GEOGRAPHY": str, #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 7ae49dff205d..0eb3351ee84c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -21,6 +21,7 @@ import mock import numpy as np +import apache_beam as beam import apache_beam.io.gcp.bigquery from apache_beam.io.gcp import bigquery_schema_tools from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper @@ -209,6 +210,133 @@ def test_unsupported_query_direct_read(self): query='SELECT name FROM dataset.sample_table', output_type='BEAM_ROW') - if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() + def test_geography_type_support(self): + """Test that GEOGRAPHY type is properly supported in schema conversion.""" + fields = [ + bigquery.TableFieldSchema( + name='location', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='locations', type='GEOGRAPHY', mode="REPEATED"), + bigquery.TableFieldSchema( + name='required_location', type='GEOGRAPHY', mode="REQUIRED") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema( + the_table_schema=schema) + + expected_annotations = { + 'location': typing.Optional[str], + 'locations': typing.Sequence[str], + 'required_location': str + } + + self.assertEqual(usertype.__annotations__, expected_annotations) + + def test_geography_in_bq_to_python_types_mapping(self): + """Test that GEOGRAPHY is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" + from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES + + self.assertIn("GEOGRAPHY", BIG_QUERY_TO_PYTHON_TYPES) + self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["GEOGRAPHY"], str) + + def test_geography_field_type_conversion(self): + """Test bq_field_to_type function with GEOGRAPHY fields.""" + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + # Test required GEOGRAPHY field + result = bq_field_to_type("GEOGRAPHY", "REQUIRED") + self.assertEqual(result, str) + + # Test nullable GEOGRAPHY field + result = bq_field_to_type("GEOGRAPHY", "NULLABLE") + self.assertEqual(result, typing.Optional[str]) + + # Test repeated GEOGRAPHY field + result = bq_field_to_type("GEOGRAPHY", "REPEATED") + self.assertEqual(result, typing.Sequence[str]) + + # Test GEOGRAPHY field with None mode (should default to nullable) + result = bq_field_to_type("GEOGRAPHY", None) + self.assertEqual(result, typing.Optional[str]) + + # Test GEOGRAPHY field with empty mode (should default to nullable) + result = bq_field_to_type("GEOGRAPHY", "") + self.assertEqual(result, typing.Optional[str]) + + def test_convert_to_usertype_with_geography(self): + """Test convert_to_usertype function with GEOGRAPHY fields.""" + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='location', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='name', type='STRING', mode="REQUIRED") + ]) + + conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) + + # Verify the transform is created successfully + self.assertIsNotNone(conversion_transform) + + # The transform should be a ParDo with BeamSchemaConversionDoFn + self.assertIsInstance(conversion_transform, beam.ParDo) + + def test_beam_schema_conversion_dofn_with_geography(self): + """Test BeamSchemaConversionDoFn with GEOGRAPHY data.""" + from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn + + # Create a user type with GEOGRAPHY field + fields = [ + bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='location', type='GEOGRAPHY', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # Create the DoFn + dofn = BeamSchemaConversionDoFn(usertype) + + # Test processing a dictionary with GEOGRAPHY data + input_dict = {'id': 1, 'location': 'POINT(30 10)'} + + results = list(dofn.process(input_dict)) + self.assertEqual(len(results), 1) + + result = results[0] + self.assertEqual(result.id, 1) + self.assertEqual(result.location, 'POINT(30 10)') + + def test_geography_with_complex_wkt(self): + """Test GEOGRAPHY type with complex Well-Known Text geometries.""" + fields = [ + bigquery.TableFieldSchema( + name='simple_point', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='linestring', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='polygon', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='multigeometry', type='GEOGRAPHY', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # All GEOGRAPHY fields should map to Optional[str] + expected_annotations = { + 'simple_point': typing.Optional[str], + 'linestring': typing.Optional[str], + 'polygon': typing.Optional[str], + 'multigeometry': typing.Optional[str] + } + + self.assertEqual(usertype.__annotations__, expected_annotations) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index d2fa7627a800..36a1015e3d27 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -121,6 +121,7 @@ "FLOAT": np.float64, "NUMERIC": decimal.Decimal, "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, + "GEOGRAPHY": str, } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 1101317439a9..066fc8985547 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1092,6 +1092,160 @@ def test_typehints_from_schema_with_repeated_struct(self): self.assertEqual(typehints, expected_typehints) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestGeographyTypeSupport(unittest.TestCase): + """Tests for GEOGRAPHY data type support in BigQuery.""" + def test_geography_in_bigquery_type_mapping(self): + """Test that GEOGRAPHY is properly mapped in type mapping.""" + from apache_beam.io.gcp.bigquery_tools import BIGQUERY_TYPE_TO_PYTHON_TYPE + + self.assertIn("GEOGRAPHY", BIGQUERY_TYPE_TO_PYTHON_TYPE) + self.assertEqual(BIGQUERY_TYPE_TO_PYTHON_TYPE["GEOGRAPHY"], str) + + def test_geography_field_conversion(self): + """Test that GEOGRAPHY fields are converted correctly.""" + from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper + + # Create a mock field with GEOGRAPHY type + field = bigquery.TableFieldSchema() + field.type = 'GEOGRAPHY' + field.name = 'location' + field.mode = 'NULLABLE' + + wrapper = BigQueryWrapper() + + # Test various WKT formats + test_cases = [ + "POINT(30 10)", + "LINESTRING(30 10, 10 30, 40 40)", + "POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))", + "MULTIPOINT((10 40), (40 30), (20 20), (30 10))", + "GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))" + ] + + for wkt_value in test_cases: + result = wrapper._convert_cell_value_to_dict(wkt_value, field) + self.assertEqual(result, wkt_value) + self.assertIsInstance(result, str) + + def test_geography_typehints_from_schema(self): + """Test that GEOGRAPHY fields generate correct type hints.""" + schema = { + "fields": [{ + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }, { + "name": "locations", + "type": "GEOGRAPHY", + "mode": "REPEATED" + }] + } + + typehints = get_beam_typehints_from_tableschema(schema) + + expected_typehints = [("location", str), + ("optional_location", Optional[str]), + ("locations", Sequence[str])] + + self.assertEqual(typehints, expected_typehints) + + def test_geography_beam_row_conversion(self): + """Test converting dictionary with GEOGRAPHY to Beam Row.""" + schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE" + }, { + "name": "name", "type": "STRING", "mode": "REQUIRED" + }] + } + + row_dict = {"id": 1, "location": "POINT(30 10)", "name": "Test Location"} + + beam_row = beam_row_from_dict(row_dict, schema) + + self.assertEqual(beam_row.id, 1) + self.assertEqual(beam_row.location, "POINT(30 10)") + self.assertEqual(beam_row.name, "Test Location") + + def test_geography_beam_row_conversion_with_null(self): + """Test converting dictionary with null GEOGRAPHY to Beam Row.""" + schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE" + }] + } + + row_dict = {"id": 1, "location": None} + + beam_row = beam_row_from_dict(row_dict, schema) + + self.assertEqual(beam_row.id, 1) + self.assertIsNone(beam_row.location) + + def test_geography_beam_row_conversion_repeated(self): + """Test converting dictionary with repeated GEOGRAPHY to Beam Row.""" + schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "locations", "type": "GEOGRAPHY", "mode": "REPEATED" + }] + } + + row_dict = { + "id": 1, + "locations": ["POINT(30 10)", "POINT(40 20)", "LINESTRING(0 0, 1 1)"] + } + + beam_row = beam_row_from_dict(row_dict, schema) + + self.assertEqual(beam_row.id, 1) + self.assertEqual(len(beam_row.locations), 3) + self.assertEqual(beam_row.locations[0], "POINT(30 10)") + self.assertEqual(beam_row.locations[1], "POINT(40 20)") + self.assertEqual(beam_row.locations[2], "LINESTRING(0 0, 1 1)") + + def test_geography_json_encoding(self): + """Test that GEOGRAPHY values are properly JSON encoded.""" + coder = RowAsDictJsonCoder() + + row_with_geography = {"id": 1, "location": "POINT(30 10)", "name": "Test"} + + encoded = coder.encode(row_with_geography) + decoded = coder.decode(encoded) + + self.assertEqual(decoded["location"], "POINT(30 10)") + self.assertIsInstance(decoded["location"], str) + + def test_geography_with_special_characters(self): + """Test GEOGRAPHY values with special characters and geometries.""" + from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper + + field = bigquery.TableFieldSchema() + field.type = 'GEOGRAPHY' + field.name = 'complex_geo' + field.mode = 'NULLABLE' + + wrapper = BigQueryWrapper() + + # Test complex WKT with various coordinate systems and precision + complex_wkt = ( + "POLYGON((-122.4194 37.7749, -122.4094 37.7849, " + "-122.3994 37.7749, -122.4194 37.7749))") + + result = wrapper._convert_cell_value_to_dict(complex_wkt, field) + self.assertEqual(result, complex_wkt) + self.assertIsInstance(result, str) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()