From 975baf8b134197e0d49eed48fad36b07656b8b36 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 11 Sep 2025 14:30:02 -0400 Subject: [PATCH 01/15] feat(bigquery): add GEOGRAPHY type support for BigQuery I/O Add support for BigQuery GEOGRAPHY type which works with Well-Known Text (WKT) format. The change includes: - Adding GEOGRAPHY to type mappings in bigquery_tools and bigquery_schema_tools - Implementing GeographyType logical type in schemas.py - Adding comprehensive tests for GEOGRAPHY type conversion and schema integration --- .../io/gcp/bigquery_schema_tools.py | 3 +- .../io/gcp/bigquery_schema_tools_test.py | 133 ++++++++++++++- .../apache_beam/io/gcp/bigquery_tools.py | 1 + .../apache_beam/io/gcp/bigquery_tools_test.py | 154 ++++++++++++++++++ sdks/python/apache_beam/typehints/schemas.py | 41 +++++ .../apache_beam/typehints/schemas_test.py | 106 ++++++++++++ 6 files changed, 434 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py index beb373a7dea3..54c7ca90f011 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py @@ -47,7 +47,8 @@ "FLOAT": np.float64, "BOOLEAN": bool, "BYTES": bytes, - "TIMESTAMP": apache_beam.utils.timestamp.Timestamp + "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, + "GEOGRAPHY": str, #TODO(https://github.com/apache/beam/issues/20810): # Finish mappings for all BQ types } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 7ae49dff205d..2ad94c7057b6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -209,6 +209,133 @@ def test_unsupported_query_direct_read(self): query='SELECT name FROM dataset.sample_table', output_type='BEAM_ROW') - if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() + def test_geography_type_support(self): + """Test that GEOGRAPHY type is properly supported in schema conversion.""" + fields = [ + bigquery.TableFieldSchema( + name='location', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='locations', type='GEOGRAPHY', mode="REPEATED"), + bigquery.TableFieldSchema( + name='required_location', type='GEOGRAPHY', mode="REQUIRED") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema( + the_table_schema=schema) + + expected_annotations = { + 'location': typing.Optional[str], + 'locations': typing.Sequence[str], + 'required_location': str + } + + self.assertEqual(usertype.__annotations__, expected_annotations) + + def test_geography_in_bq_to_python_types_mapping(self): + """Test that GEOGRAPHY is included in BIG_QUERY_TO_PYTHON_TYPES mapping.""" + from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES + + self.assertIn("GEOGRAPHY", BIG_QUERY_TO_PYTHON_TYPES) + self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["GEOGRAPHY"], str) + + def test_geography_field_type_conversion(self): + """Test bq_field_to_type function with GEOGRAPHY fields.""" + from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type + + # Test required GEOGRAPHY field + result = bq_field_to_type("GEOGRAPHY", "REQUIRED") + self.assertEqual(result, str) + + # Test nullable GEOGRAPHY field + result = bq_field_to_type("GEOGRAPHY", "NULLABLE") + self.assertEqual(result, typing.Optional[str]) + + # Test repeated GEOGRAPHY field + result = bq_field_to_type("GEOGRAPHY", "REPEATED") + self.assertEqual(result, typing.Sequence[str]) + + # Test GEOGRAPHY field with None mode (should default to nullable) + result = bq_field_to_type("GEOGRAPHY", None) + self.assertEqual(result, typing.Optional[str]) + + # Test GEOGRAPHY field with empty mode (should default to nullable) + result = bq_field_to_type("GEOGRAPHY", "") + self.assertEqual(result, typing.Optional[str]) + + def test_convert_to_usertype_with_geography(self): + """Test convert_to_usertype function with GEOGRAPHY fields.""" + schema = bigquery.TableSchema( + fields=[ + bigquery.TableFieldSchema( + name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='location', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='name', type='STRING', mode="REQUIRED") + ]) + + conversion_transform = bigquery_schema_tools.convert_to_usertype(schema) + + # Verify the transform is created successfully + self.assertIsNotNone(conversion_transform) + + # The transform should be a ParDo with BeamSchemaConversionDoFn + self.assertIsInstance(conversion_transform, beam.ParDo) + + def test_beam_schema_conversion_dofn_with_geography(self): + """Test BeamSchemaConversionDoFn with GEOGRAPHY data.""" + from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn + + # Create a user type with GEOGRAPHY field + fields = [ + bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"), + bigquery.TableFieldSchema( + name='location', type='GEOGRAPHY', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # Create the DoFn + dofn = BeamSchemaConversionDoFn(usertype) + + # Test processing a dictionary with GEOGRAPHY data + input_dict = {'id': 1, 'location': 'POINT(30 10)'} + + results = list(dofn.process(input_dict)) + self.assertEqual(len(results), 1) + + result = results[0] + self.assertEqual(result.id, 1) + self.assertEqual(result.location, 'POINT(30 10)') + + def test_geography_with_complex_wkt(self): + """Test GEOGRAPHY type with complex Well-Known Text geometries.""" + fields = [ + bigquery.TableFieldSchema( + name='simple_point', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='linestring', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='polygon', type='GEOGRAPHY', mode="NULLABLE"), + bigquery.TableFieldSchema( + name='multigeometry', type='GEOGRAPHY', mode="NULLABLE") + ] + schema = bigquery.TableSchema(fields=fields) + + usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema) + + # All GEOGRAPHY fields should map to Optional[str] + expected_annotations = { + 'simple_point': typing.Optional[str], + 'linestring': typing.Optional[str], + 'polygon': typing.Optional[str], + 'multigeometry': typing.Optional[str] + } + + self.assertEqual(usertype.__annotations__, expected_annotations) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 738d6e9c70f3..2a696be8b14f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -121,6 +121,7 @@ "FLOAT": np.float64, "NUMERIC": decimal.Decimal, "TIMESTAMP": apache_beam.utils.timestamp.Timestamp, + "GEOGRAPHY": str, } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 1320ced1dee5..7ea1136aa857 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1064,6 +1064,160 @@ def test_typehints_from_schema_with_repeated_struct(self): self.assertEqual(typehints, expected_typehints) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestGeographyTypeSupport(unittest.TestCase): + """Tests for GEOGRAPHY data type support in BigQuery.""" + def test_geography_in_bigquery_type_mapping(self): + """Test that GEOGRAPHY is properly mapped in BIGQUERY_TYPE_TO_PYTHON_TYPE.""" + from apache_beam.io.gcp.bigquery_tools import BIGQUERY_TYPE_TO_PYTHON_TYPE + + self.assertIn("GEOGRAPHY", BIGQUERY_TYPE_TO_PYTHON_TYPE) + self.assertEqual(BIGQUERY_TYPE_TO_PYTHON_TYPE["GEOGRAPHY"], str) + + def test_geography_field_conversion(self): + """Test that GEOGRAPHY fields are converted correctly.""" + from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper + from apache_beam.io.gcp.internal.clients import bigquery + + # Create a mock field with GEOGRAPHY type + field = bigquery.TableFieldSchema() + field.type = 'GEOGRAPHY' + field.name = 'location' + field.mode = 'NULLABLE' + + wrapper = BigQueryWrapper() + + # Test various WKT formats + test_cases = [ + "POINT(30 10)", + "LINESTRING(30 10, 10 30, 40 40)", + "POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))", + "MULTIPOINT((10 40), (40 30), (20 20), (30 10))", + "GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))" + ] + + for wkt_value in test_cases: + result = wrapper._convert_cell_value_to_dict(wkt_value, field) + self.assertEqual(result, wkt_value) + self.assertIsInstance(result, str) + + def test_geography_typehints_from_schema(self): + """Test that GEOGRAPHY fields generate correct type hints.""" + schema = { + "fields": [{ + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }, { + "name": "locations", + "type": "GEOGRAPHY", + "mode": "REPEATED" + }] + } + + typehints = get_beam_typehints_from_tableschema(schema) + + expected_typehints = [("location", str), + ("optional_location", Optional[str]), + ("locations", Sequence[str])] + + self.assertEqual(typehints, expected_typehints) + + def test_geography_beam_row_conversion(self): + """Test converting dictionary with GEOGRAPHY to Beam Row.""" + schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE" + }, { + "name": "name", "type": "STRING", "mode": "REQUIRED" + }] + } + + row_dict = {"id": 1, "location": "POINT(30 10)", "name": "Test Location"} + + beam_row = beam_row_from_dict(row_dict, schema) + + self.assertEqual(beam_row.id, 1) + self.assertEqual(beam_row.location, "POINT(30 10)") + self.assertEqual(beam_row.name, "Test Location") + + def test_geography_beam_row_conversion_with_null(self): + """Test converting dictionary with null GEOGRAPHY to Beam Row.""" + schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "NULLABLE" + }] + } + + row_dict = {"id": 1, "location": None} + + beam_row = beam_row_from_dict(row_dict, schema) + + self.assertEqual(beam_row.id, 1) + self.assertIsNone(beam_row.location) + + def test_geography_beam_row_conversion_repeated(self): + """Test converting dictionary with repeated GEOGRAPHY to Beam Row.""" + schema = { + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "locations", "type": "GEOGRAPHY", "mode": "REPEATED" + }] + } + + row_dict = { + "id": 1, + "locations": ["POINT(30 10)", "POINT(40 20)", "LINESTRING(0 0, 1 1)"] + } + + beam_row = beam_row_from_dict(row_dict, schema) + + self.assertEqual(beam_row.id, 1) + self.assertEqual(len(beam_row.locations), 3) + self.assertEqual(beam_row.locations[0], "POINT(30 10)") + self.assertEqual(beam_row.locations[1], "POINT(40 20)") + self.assertEqual(beam_row.locations[2], "LINESTRING(0 0, 1 1)") + + def test_geography_json_encoding(self): + """Test that GEOGRAPHY values are properly JSON encoded.""" + coder = RowAsDictJsonCoder() + + row_with_geography = {"id": 1, "location": "POINT(30 10)", "name": "Test"} + + encoded = coder.encode(row_with_geography) + decoded = coder.decode(encoded) + + self.assertEqual(decoded["location"], "POINT(30 10)") + self.assertIsInstance(decoded["location"], str) + + def test_geography_with_special_characters(self): + """Test GEOGRAPHY values with special characters and complex geometries.""" + from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper + from apache_beam.io.gcp.internal.clients import bigquery + + field = bigquery.TableFieldSchema() + field.type = 'GEOGRAPHY' + field.name = 'complex_geo' + field.mode = 'NULLABLE' + + wrapper = BigQueryWrapper() + + # Test complex WKT with various coordinate systems and precision + complex_wkt = "POLYGON((-122.4194 37.7749, -122.4094 37.7849, -122.3994 37.7749, -122.4194 37.7749))" + + result = wrapper._convert_cell_value_to_dict(complex_wkt, field) + self.assertEqual(result, complex_wkt) + self.assertIsInstance(result, str) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index c21dde426fc7..bcd2e253b4f5 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -1061,6 +1061,47 @@ def _from_typing(cls, typ): return cls() +@LogicalType.register_logical_type +class GeographyType(LogicalType[str, str, str]): + """ + For internal use only; no backwards-compatibility guarantees. + + Support for BigQuery GEOGRAPHY logical type. GEOGRAPHY data type works with + Well-Known Text (WKT) format for reading and writing to BigQuery. + """ + def __init__(self, argument=""): + pass + + @classmethod + def representation_type(cls) -> type: + return str + + @classmethod + def urn(cls): + return "beam:logical_type:geography:v1" + + @classmethod + def language_type(cls): + return str + + def to_representation_type(self, value: str) -> str: + return value + + def to_language_type(self, value: str) -> str: + return value + + @classmethod + def argument_type(cls): + return str + + def argument(self): + return "" + + @classmethod + def _from_typing(cls, typ): + return cls() + + # TODO(yathu,BEAM-10722): Investigate and resolve conflicts in logical type # registration when more than one logical types sharing the same language type LogicalType.register_logical_type(DecimalLogicalType) diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 6cf37322147e..175f44545d4d 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -600,6 +600,112 @@ def test_python_callable_maps_to_logical_type(self): schema_registry=SchemaTypeRegistry()), PythonCallableWithSource) + def test_geography_type_maps_to_logical_type(self): + from apache_beam.typehints.schemas import GeographyType + self.assertEqual( + schema_pb2.FieldType( + logical_type=schema_pb2.LogicalType( + urn="beam:logical_type:geography:v1", + representation=typing_to_runner_api(str))), + typing_to_runner_api(GeographyType)) + self.assertEqual( + typing_from_runner_api( + schema_pb2.FieldType( + logical_type=schema_pb2.LogicalType( + urn="beam:logical_type:geography:v1", + representation=typing_to_runner_api(str))), + schema_registry=SchemaTypeRegistry()), + GeographyType) + + def test_optional_geography_type_maps_to_logical_type(self): + from apache_beam.typehints.schemas import GeographyType + self.assertEqual( + schema_pb2.FieldType( + nullable=True, + logical_type=schema_pb2.LogicalType( + urn="beam:logical_type:geography:v1", + representation=typing_to_runner_api(str))), + typing_to_runner_api(Optional[GeographyType])) + self.assertEqual( + typing_from_runner_api( + schema_pb2.FieldType( + nullable=True, + logical_type=schema_pb2.LogicalType( + urn="beam:logical_type:geography:v1", + representation=typing_to_runner_api(str))), + schema_registry=SchemaTypeRegistry()), + Optional[GeographyType]) + + def test_geography_type_instantiation(self): + from apache_beam.typehints.schemas import GeographyType + + # Test basic instantiation + gt = GeographyType() + self.assertEqual(gt.urn(), "beam:logical_type:geography:v1") + self.assertEqual(gt.language_type(), str) + self.assertEqual(gt.representation_type(), str) + self.assertEqual(gt.argument_type(), str) + self.assertEqual(gt.argument(), "") + + # Test instantiation with argument + gt_with_arg = GeographyType("test_arg") + self.assertEqual(gt_with_arg.argument(), "") + + def test_geography_type_conversion_methods(self): + from apache_beam.typehints.schemas import GeographyType + + gt = GeographyType() + + # Test various WKT formats + test_cases = [ + "POINT(30 10)", + "LINESTRING(30 10, 10 30, 40 40)", + "POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))", + "MULTIPOINT((10 40), (40 30), (20 20), (30 10))", + "MULTILINESTRING((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))", + "GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))" + ] + + for wkt in test_cases: + # Test to_representation_type + result = gt.to_representation_type(wkt) + self.assertEqual(result, wkt) + self.assertIsInstance(result, str) + + # Test to_language_type + result = gt.to_language_type(wkt) + self.assertEqual(result, wkt) + self.assertIsInstance(result, str) + + def test_geography_type_from_typing(self): + from apache_beam.typehints.schemas import GeographyType + + # Test _from_typing class method + gt = GeographyType._from_typing(str) + self.assertIsInstance(gt, GeographyType) + self.assertEqual(gt.urn(), "beam:logical_type:geography:v1") + + def test_geography_type_schema_integration(self): + from apache_beam.typehints.schemas import GeographyType, named_fields_to_schema + + # Test schema creation with GeographyType + fields = [('location', GeographyType), ('name', str)] + schema = named_fields_to_schema(fields) + + self.assertEqual(len(schema.fields), 2) + + # Check geography field + geo_field = schema.fields[0] + self.assertEqual(geo_field.name, 'location') + self.assertTrue(geo_field.type.HasField('logical_type')) + self.assertEqual( + geo_field.type.logical_type.urn, "beam:logical_type:geography:v1") + + # Check string field + str_field = schema.fields[1] + self.assertEqual(str_field.name, 'name') + self.assertEqual(str_field.type.atomic_type, schema_pb2.STRING) + def test_trivial_example(self): MyCuteClass = NamedTuple( 'MyCuteClass', From f2fe2f77b8445245541733e660a9e3c50b9c2e16 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 11 Sep 2025 16:27:19 -0400 Subject: [PATCH 02/15] fixed tests --- sdks/python/apache_beam/io/gcp/bigquery_tools_test.py | 10 +++++----- sdks/python/apache_beam/typehints/schemas.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 7ea1136aa857..ba7c0d380182 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -1068,7 +1068,7 @@ def test_typehints_from_schema_with_repeated_struct(self): class TestGeographyTypeSupport(unittest.TestCase): """Tests for GEOGRAPHY data type support in BigQuery.""" def test_geography_in_bigquery_type_mapping(self): - """Test that GEOGRAPHY is properly mapped in BIGQUERY_TYPE_TO_PYTHON_TYPE.""" + """Test that GEOGRAPHY is properly mapped in type mapping.""" from apache_beam.io.gcp.bigquery_tools import BIGQUERY_TYPE_TO_PYTHON_TYPE self.assertIn("GEOGRAPHY", BIGQUERY_TYPE_TO_PYTHON_TYPE) @@ -1077,7 +1077,6 @@ def test_geography_in_bigquery_type_mapping(self): def test_geography_field_conversion(self): """Test that GEOGRAPHY fields are converted correctly.""" from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper - from apache_beam.io.gcp.internal.clients import bigquery # Create a mock field with GEOGRAPHY type field = bigquery.TableFieldSchema() @@ -1199,9 +1198,8 @@ def test_geography_json_encoding(self): self.assertIsInstance(decoded["location"], str) def test_geography_with_special_characters(self): - """Test GEOGRAPHY values with special characters and complex geometries.""" + """Test GEOGRAPHY values with special characters and geometries.""" from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper - from apache_beam.io.gcp.internal.clients import bigquery field = bigquery.TableFieldSchema() field.type = 'GEOGRAPHY' @@ -1211,7 +1209,9 @@ def test_geography_with_special_characters(self): wrapper = BigQueryWrapper() # Test complex WKT with various coordinate systems and precision - complex_wkt = "POLYGON((-122.4194 37.7749, -122.4094 37.7849, -122.3994 37.7749, -122.4194 37.7749))" + complex_wkt = ( + "POLYGON((-122.4194 37.7749, -122.4094 37.7849, " + "-122.3994 37.7749, -122.4194 37.7749))") result = wrapper._convert_cell_value_to_dict(complex_wkt, field) self.assertEqual(result, complex_wkt) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index bcd2e253b4f5..4a5b3e10bf2b 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -350,7 +350,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType: try: if LogicalType.is_known_logical_type(type_): - logical_type = type_ + logical_type = type_() else: logical_type = LogicalType.from_typing(type_) except ValueError: From 0726056c936eb626b313aec4142caf2a27d193ca Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 11 Sep 2025 18:00:53 -0400 Subject: [PATCH 03/15] tests --- .../io/gcp/bigquery_schema_tools_test.py | 1 + .../apache_beam/typehints/schemas_test.py | 20 +++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py index 2ad94c7057b6..0eb3351ee84c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py @@ -21,6 +21,7 @@ import mock import numpy as np +import apache_beam as beam import apache_beam.io.gcp.bigquery from apache_beam.io.gcp import bigquery_schema_tools from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 175f44545d4d..93f19db38e40 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -606,14 +606,20 @@ def test_geography_type_maps_to_logical_type(self): schema_pb2.FieldType( logical_type=schema_pb2.LogicalType( urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str))), + representation=typing_to_runner_api(str), + argument_type=typing_to_runner_api(str), + argument=schema_pb2.FieldValue( + atomic_value=schema_pb2.AtomicValue(string_value="")))), typing_to_runner_api(GeographyType)) self.assertEqual( typing_from_runner_api( schema_pb2.FieldType( logical_type=schema_pb2.LogicalType( urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str))), + representation=typing_to_runner_api(str), + argument_type=typing_to_runner_api(str), + argument=schema_pb2.FieldValue( + atomic_value=schema_pb2.AtomicValue(string_value="")))), schema_registry=SchemaTypeRegistry()), GeographyType) @@ -624,7 +630,10 @@ def test_optional_geography_type_maps_to_logical_type(self): nullable=True, logical_type=schema_pb2.LogicalType( urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str))), + representation=typing_to_runner_api(str), + argument_type=typing_to_runner_api(str), + argument=schema_pb2.FieldValue( + atomic_value=schema_pb2.AtomicValue(string_value="")))), typing_to_runner_api(Optional[GeographyType])) self.assertEqual( typing_from_runner_api( @@ -632,7 +641,10 @@ def test_optional_geography_type_maps_to_logical_type(self): nullable=True, logical_type=schema_pb2.LogicalType( urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str))), + representation=typing_to_runner_api(str), + argument_type=typing_to_runner_api(str), + argument=schema_pb2.FieldValue( + atomic_value=schema_pb2.AtomicValue(string_value="")))), schema_registry=SchemaTypeRegistry()), Optional[GeographyType]) From 204e03749ef559bf2355806bcf76674a00d64be4 Mon Sep 17 00:00:00 2001 From: liferoad Date: Thu, 11 Sep 2025 20:23:21 -0400 Subject: [PATCH 04/15] fixed tests --- sdks/python/apache_beam/typehints/schemas_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 93f19db38e40..23de5619edca 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -609,7 +609,7 @@ def test_geography_type_maps_to_logical_type(self): representation=typing_to_runner_api(str), argument_type=typing_to_runner_api(str), argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicValue(string_value="")))), + atomic_value=schema_pb2.AtomicTypeValue(string="")))), typing_to_runner_api(GeographyType)) self.assertEqual( typing_from_runner_api( @@ -619,7 +619,7 @@ def test_geography_type_maps_to_logical_type(self): representation=typing_to_runner_api(str), argument_type=typing_to_runner_api(str), argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicValue(string_value="")))), + atomic_value=schema_pb2.AtomicTypeValue(string="")))), schema_registry=SchemaTypeRegistry()), GeographyType) @@ -633,7 +633,7 @@ def test_optional_geography_type_maps_to_logical_type(self): representation=typing_to_runner_api(str), argument_type=typing_to_runner_api(str), argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicValue(string_value="")))), + atomic_value=schema_pb2.AtomicTypeValue(string="")))), typing_to_runner_api(Optional[GeographyType])) self.assertEqual( typing_from_runner_api( @@ -644,7 +644,7 @@ def test_optional_geography_type_maps_to_logical_type(self): representation=typing_to_runner_api(str), argument_type=typing_to_runner_api(str), argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicValue(string_value="")))), + atomic_value=schema_pb2.AtomicTypeValue(string="")))), schema_registry=SchemaTypeRegistry()), Optional[GeographyType]) From 28e13ef8c6c4f96417d38d32325e077da50969aa Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 12 Sep 2025 10:23:57 -0400 Subject: [PATCH 05/15] fixes language_type --- sdks/python/apache_beam/typehints/schemas.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 4a5b3e10bf2b..e0bff36dad8a 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -552,8 +552,9 @@ def typing_from_runner_api( if fieldtype_proto.logical_type.urn == PYTHON_ANY_URN: return Any else: - return LogicalType.from_runner_api( - fieldtype_proto.logical_type).language_type() + logical_type_instance = LogicalType.from_runner_api( + fieldtype_proto.logical_type) + return type(logical_type_instance) elif type_info == "iterable_type": return Sequence[self.typing_from_runner_api( From b399fc5172c83d9e7dbead538efeff9263ba3ab9 Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 12 Sep 2025 11:53:26 -0400 Subject: [PATCH 06/15] fixed logical type --- sdks/python/apache_beam/typehints/schemas.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index e0bff36dad8a..e2d4b195c1ce 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -554,7 +554,12 @@ def typing_from_runner_api( else: logical_type_instance = LogicalType.from_runner_api( fieldtype_proto.logical_type) - return type(logical_type_instance) + # Special case for GeographyType: return the logical type class itself + # instead of the language_type to maintain semantic meaning + if fieldtype_proto.logical_type.urn == "beam:logical_type:geography:v1": + return type(logical_type_instance) + else: + return logical_type_instance.language_type() elif type_info == "iterable_type": return Sequence[self.typing_from_runner_api( From 9ff2bbdcc989794f79e185c2c0b77f40d467c921 Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 12 Sep 2025 11:56:56 -0400 Subject: [PATCH 07/15] urns --- sdks/python/apache_beam/typehints/schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index e2d4b195c1ce..139bb828e870 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -556,7 +556,7 @@ def typing_from_runner_api( fieldtype_proto.logical_type) # Special case for GeographyType: return the logical type class itself # instead of the language_type to maintain semantic meaning - if fieldtype_proto.logical_type.urn == "beam:logical_type:geography:v1": + if fieldtype_proto.logical_type.urn == GeographyType.urn(): return type(logical_type_instance) else: return logical_type_instance.language_type() From 0fdb05f75b2508982a947e7a78e4bf3a9649bc2a Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 12 Sep 2025 16:37:54 -0400 Subject: [PATCH 08/15] add BQ IT --- .../trigger_files/beam_PostCommit_Python.json | 2 +- .../io/gcp/bigquery_geography_it_test.py | 467 ++++++++++++++++++ 2 files changed, 468 insertions(+), 1 deletion(-) create mode 100644 sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 00e0c3c25433..8675e9535061 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 27 + "modification": 28 } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py new file mode 100644 index 000000000000..e71f07eae9ae --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for BigQuery GEOGRAPHY data type support.""" + +import logging +import secrets +import time +import unittest + +import hamcrest as hc +import pytest + +import apache_beam as beam +from apache_beam.io.gcp.bigquery import WriteToBigQuery +from apache_beam.io.gcp.bigquery import ReadFromBigQuery +from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper +from apache_beam.io.gcp.internal.clients import bigquery +from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.typehints.schemas import GeographyType + +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None + +_LOGGER = logging.getLogger(__name__) + + +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class BigQueryGeographyIntegrationTests(unittest.TestCase): + """Integration tests for BigQuery GEOGRAPHY data type.""" + + BIG_QUERY_DATASET_ID = 'python_geography_it_test_' + + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + self.runner_name = type(self.test_pipeline.runner).__name__ + self.project = self.test_pipeline.get_option('project') + + self.bigquery_client = BigQueryWrapper() + self.dataset_id = '%s%d%s' % ( + self.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3)) + self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id) + _LOGGER.info( + "Created dataset %s in project %s", self.dataset_id, self.project) + + def tearDown(self): + request = bigquery.BigqueryDatasetsDeleteRequest( + projectId=self.project, datasetId=self.dataset_id, deleteContents=True) + try: + _LOGGER.info( + "Deleting dataset %s in project %s", self.dataset_id, self.project) + self.bigquery_client.client.datasets.Delete(request) + except HttpError: + _LOGGER.debug( + 'Failed to clean up dataset %s in project %s', + self.dataset_id, + self.project) + + def create_geography_table(self, table_name, include_repeated=False): + """Create a table with various GEOGRAPHY field configurations.""" + table_schema = bigquery.TableSchema() + + # ID field + id_field = bigquery.TableFieldSchema() + id_field.name = 'id' + id_field.type = 'INTEGER' + id_field.mode = 'REQUIRED' + table_schema.fields.append(id_field) + + # Required GEOGRAPHY field + geo_required = bigquery.TableFieldSchema() + geo_required.name = 'location' + geo_required.type = 'GEOGRAPHY' + geo_required.mode = 'REQUIRED' + table_schema.fields.append(geo_required) + + # Nullable GEOGRAPHY field + geo_nullable = bigquery.TableFieldSchema() + geo_nullable.name = 'optional_location' + geo_nullable.type = 'GEOGRAPHY' + geo_nullable.mode = 'NULLABLE' + table_schema.fields.append(geo_nullable) + + if include_repeated: + # Repeated GEOGRAPHY field + geo_repeated = bigquery.TableFieldSchema() + geo_repeated.name = 'path' + geo_repeated.type = 'GEOGRAPHY' + geo_repeated.mode = 'REPEATED' + table_schema.fields.append(geo_repeated) + + table = bigquery.Table( + tableReference=bigquery.TableReference( + projectId=self.project, + datasetId=self.dataset_id, + tableId=table_name), + schema=table_schema) + request = bigquery.BigqueryTablesInsertRequest( + projectId=self.project, datasetId=self.dataset_id, table=table) + self.bigquery_client.client.tables.Insert(request) + + # Wait for table to be available + _ = self.bigquery_client.get_table(self.project, self.dataset_id, table_name) + + @pytest.mark.it_postcommit + def test_geography_write_and_read_basic_geometries(self): + """Test writing and reading basic GEOGRAPHY geometries.""" + table_name = 'geography_basic_geometries' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + # Test data with various WKT geometry types + input_data = [ + { + 'id': 1, + 'location': 'POINT(30 10)', + 'optional_location': 'POINT(-122.4194 37.7749)' # San Francisco + }, + { + 'id': 2, + 'location': 'LINESTRING(30 10, 10 30, 40 40)', + 'optional_location': None + }, + { + 'id': 3, + 'location': 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', + 'optional_location': 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))' + }, + { + 'id': 4, + 'location': 'MULTIPOINT((10 40), (40 30), (20 20), (30 10))', + 'optional_location': 'POINT(0 0)' + }, + { + 'id': 5, + 'location': 'MULTILINESTRING((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))', + 'optional_location': None + } + ] + + table_schema = { + "fields": [ + {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, + {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} + ] + } + + # Write data to BigQuery + with TestPipeline(is_integration_test=True) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STREAMING_INSERTS)) + + # Read data back and verify + with TestPipeline(is_integration_test=True) as p: + result = ( + p + | 'ReadFromBQ' >> ReadFromBigQuery( + table=table_id, + method=ReadFromBigQuery.Method.DIRECT_READ) + | 'ExtractGeography' >> beam.Map( + lambda row: (row['id'], row['location'], row['optional_location']))) + + expected_data = [ + (1, 'POINT(30 10)', 'POINT(-122.4194 37.7749)'), + (2, 'LINESTRING(30 10, 10 30, 40 40)', None), + (3, 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), + (4, 'MULTIPOINT((10 40), (40 30), (20 20), (30 10))', 'POINT(0 0)'), + (5, 'MULTILINESTRING((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))', None) + ] + + assert_that(result, equal_to(expected_data)) + + @pytest.mark.it_postcommit + def test_geography_write_with_beam_rows(self): + """Test writing GEOGRAPHY data using Beam Rows with GeographyType.""" + table_name = 'geography_beam_rows' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + # Create Beam Rows with GeographyType + row_elements = [ + beam.Row( + id=1, + location='POINT(1 1)', + optional_location='POINT(2 2)' + ), + beam.Row( + id=2, + location='LINESTRING(0 0, 1 1, 2 2)', + optional_location=None + ), + beam.Row( + id=3, + location='POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + optional_location='POINT(3 3)' + ) + ] + + # Expected data for verification + expected_data = [ + (1, 'POINT(1 1)', 'POINT(2 2)'), + (2, 'LINESTRING(0 0, 1 1, 2 2)', None), + (3, 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', 'POINT(3 3)') + ] + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT id, location, optional_location FROM %s ORDER BY id" % table_id, + data=expected_data) + ] + + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateRows' >> beam.Create(row_elements) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + method=WriteToBigQuery.Method.STREAMING_INSERTS)) + + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + + @pytest.mark.it_postcommit + def test_geography_repeated_fields(self): + """Test GEOGRAPHY fields with REPEATED mode.""" + table_name = 'geography_repeated' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [ + { + 'id': 1, + 'location': 'POINT(0 0)', + 'optional_location': 'POINT(1 1)', + 'path': ['POINT(0 0)', 'POINT(1 1)', 'POINT(2 2)'] + }, + { + 'id': 2, + 'location': 'POINT(10 10)', + 'optional_location': None, + 'path': ['LINESTRING(0 0, 5 5)', 'LINESTRING(5 5, 10 10)'] + }, + { + 'id': 3, + 'location': 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', + 'optional_location': 'POINT(0.5 0.5)', + 'path': [] # Empty array + } + ] + + table_schema = { + "fields": [ + {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, + {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"}, + {"name": "path", "type": "GEOGRAPHY", "mode": "REPEATED"} + ] + } + + # Write data + with TestPipeline(is_integration_test=True) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STREAMING_INSERTS)) + + # Read and verify + with TestPipeline(is_integration_test=True) as p: + result = ( + p + | 'ReadFromBQ' >> ReadFromBigQuery( + table=table_id, + method=ReadFromBigQuery.Method.DIRECT_READ) + | 'ExtractData' >> beam.Map( + lambda row: (row['id'], len(row['path']) if row['path'] else 0))) + + expected_counts = [(1, 3), (2, 2), (3, 0)] + assert_that(result, equal_to(expected_counts)) + + @pytest.mark.it_postcommit + def test_geography_complex_geometries(self): + """Test complex GEOGRAPHY geometries and edge cases.""" + table_name = 'geography_complex' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + # Complex geometries including collections and high precision coordinates + input_data = [ + { + 'id': 1, + 'location': 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))', + 'optional_location': None + }, + { + 'id': 2, + 'location': 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ((2 2, 2 3, 3 3, 3 2, 2 2)))', + 'optional_location': 'POINT(-122.419416 37.774929)' # High precision + }, + { + 'id': 3, + 'location': 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), (1 1, 1 9, 9 9, 9 1, 1 1))', # With hole + 'optional_location': 'LINESTRING(-180 -90, 180 90)' # Extreme coordinates + } + ] + + table_schema = { + "fields": [ + {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, + {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} + ] + } + + expected_data = [ + (1, 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))', None), + (2, 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ((2 2, 2 3, 3 3, 3 2, 2 2)))', 'POINT(-122.419416 37.774929)'), + (3, 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), (1 1, 1 9, 9 9, 9 1, 1 1))', 'LINESTRING(-180 -90, 180 90)') + ] + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT id, location, optional_location FROM %s ORDER BY id" % table_id, + data=expected_data) + ] + + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STREAMING_INSERTS)) + + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + + @pytest.mark.it_postcommit + def test_geography_storage_write_api(self): + """Test GEOGRAPHY with Storage Write API method.""" + table_name = 'geography_storage_write' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [ + { + 'id': 1, + 'location': 'POINT(0 0)', + 'optional_location': 'POINT(1 1)' + }, + { + 'id': 2, + 'location': 'LINESTRING(0 0, 1 1)', + 'optional_location': None + } + ] + + table_schema = { + "fields": [ + {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, + {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} + ] + } + + expected_data = [ + (1, 'POINT(0 0)', 'POINT(1 1)'), + (2, 'LINESTRING(0 0, 1 1)', None) + ] + + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT id, location, optional_location FROM %s ORDER BY id" % table_id, + data=expected_data) + ] + + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.STORAGE_WRITE_API)) + + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + + @pytest.mark.it_postcommit + def test_geography_file_loads_method(self): + """Test GEOGRAPHY with FILE_LOADS method.""" + table_name = 'geography_file_loads' + table_id = '{}.{}'.format(self.dataset_id, table_name) + + input_data = [ + { + 'id': i, + 'location': f'POINT({i} {i})', + 'optional_location': f'POINT({i+10} {i+10})' if i % 2 == 0 else None + } + for i in range(1, 11) # 10 records + ] + + table_schema = { + "fields": [ + {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, + {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, + {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} + ] + } + + # Verify count and some sample data + pipeline_verifiers = [ + BigqueryFullResultMatcher( + project=self.project, + query="SELECT COUNT(*) as count FROM %s" % table_id, + data=[(10,)]) + ] + + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + _ = ( + p + | 'CreateData' >> beam.Create(input_data) + | 'WriteToBQ' >> WriteToBigQuery( + table=table_id, + schema=table_schema, + method=WriteToBigQuery.Method.FILE_LOADS)) + + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + unittest.main() \ No newline at end of file From 01e31446fec008bd97ce0cd27a0f80ca2cbf908a Mon Sep 17 00:00:00 2001 From: liferoad Date: Fri, 12 Sep 2025 16:41:10 -0400 Subject: [PATCH 09/15] yapf --- .../io/gcp/bigquery_geography_it_test.py | 306 ++++++++++-------- 1 file changed, 173 insertions(+), 133 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index e71f07eae9ae..011fdc32b591 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -32,7 +32,8 @@ from apache_beam.io.gcp.bigquery import ReadFromBigQuery from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery -from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher +from apache_beam.io.gcp.tests.bigquery_matcher import ( + BigqueryFullResultMatcher) from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to @@ -49,7 +50,7 @@ @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class BigQueryGeographyIntegrationTests(unittest.TestCase): """Integration tests for BigQuery GEOGRAPHY data type.""" - + BIG_QUERY_DATASET_ID = 'python_geography_it_test_' def setUp(self): @@ -80,28 +81,28 @@ def tearDown(self): def create_geography_table(self, table_name, include_repeated=False): """Create a table with various GEOGRAPHY field configurations.""" table_schema = bigquery.TableSchema() - + # ID field id_field = bigquery.TableFieldSchema() id_field.name = 'id' id_field.type = 'INTEGER' id_field.mode = 'REQUIRED' table_schema.fields.append(id_field) - + # Required GEOGRAPHY field geo_required = bigquery.TableFieldSchema() geo_required.name = 'location' geo_required.type = 'GEOGRAPHY' geo_required.mode = 'REQUIRED' table_schema.fields.append(geo_required) - + # Nullable GEOGRAPHY field geo_nullable = bigquery.TableFieldSchema() geo_nullable.name = 'optional_location' geo_nullable.type = 'GEOGRAPHY' geo_nullable.mode = 'NULLABLE' table_schema.fields.append(geo_nullable) - + if include_repeated: # Repeated GEOGRAPHY field geo_repeated = bigquery.TableFieldSchema() @@ -109,7 +110,7 @@ def create_geography_table(self, table_name, include_repeated=False): geo_repeated.type = 'GEOGRAPHY' geo_repeated.mode = 'REPEATED' table_schema.fields.append(geo_repeated) - + table = bigquery.Table( tableReference=bigquery.TableReference( projectId=self.project, @@ -119,22 +120,23 @@ def create_geography_table(self, table_name, include_repeated=False): request = bigquery.BigqueryTablesInsertRequest( projectId=self.project, datasetId=self.dataset_id, table=table) self.bigquery_client.client.tables.Insert(request) - + # Wait for table to be available - _ = self.bigquery_client.get_table(self.project, self.dataset_id, table_name) + _ = self.bigquery_client.get_table( + self.project, self.dataset_id, table_name) @pytest.mark.it_postcommit def test_geography_write_and_read_basic_geometries(self): """Test writing and reading basic GEOGRAPHY geometries.""" table_name = 'geography_basic_geometries' table_id = '{}.{}'.format(self.dataset_id, table_name) - + # Test data with various WKT geometry types input_data = [ { 'id': 1, 'location': 'POINT(30 10)', - 'optional_location': 'POINT(-122.4194 37.7749)' # San Francisco + 'optional_location': ('POINT(-122.4194 37.7749)') # San Francisco }, { 'id': 2, @@ -143,29 +145,36 @@ def test_geography_write_and_read_basic_geometries(self): }, { 'id': 3, - 'location': 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', - 'optional_location': 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))' + 'location': ('POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))'), + 'optional_location': ('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))') }, { 'id': 4, - 'location': 'MULTIPOINT((10 40), (40 30), (20 20), (30 10))', + 'location': ('MULTIPOINT((10 40), (40 30), (20 20), (30 10))'), 'optional_location': 'POINT(0 0)' }, { 'id': 5, - 'location': 'MULTILINESTRING((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))', + 'location': ( + 'MULTILINESTRING((10 10, 20 20, 10 40), ' + '(40 40, 30 30, 40 20, 30 10))'), 'optional_location': None } ] - + table_schema = { - "fields": [ - {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, - {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} - ] + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] } - + # Write data to BigQuery with TestPipeline(is_integration_test=True) as p: _ = ( @@ -175,25 +184,32 @@ def test_geography_write_and_read_basic_geometries(self): table=table_id, schema=table_schema, method=WriteToBigQuery.Method.STREAMING_INSERTS)) - + # Read data back and verify with TestPipeline(is_integration_test=True) as p: result = ( p | 'ReadFromBQ' >> ReadFromBigQuery( - table=table_id, - method=ReadFromBigQuery.Method.DIRECT_READ) + table=table_id, method=ReadFromBigQuery.Method.DIRECT_READ) | 'ExtractGeography' >> beam.Map( - lambda row: (row['id'], row['location'], row['optional_location']))) - + lambda row: + (row['id'], row['location'], row['optional_location']))) + expected_data = [ (1, 'POINT(30 10)', 'POINT(-122.4194 37.7749)'), (2, 'LINESTRING(30 10, 10 30, 40 40)', None), - (3, 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), + ( + 3, + 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', + 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), (4, 'MULTIPOINT((10 40), (40 30), (20 20), (30 10))', 'POINT(0 0)'), - (5, 'MULTILINESTRING((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))', None) + ( + 5, + 'MULTILINESTRING((10 10, 20 20, 10 40), ' + '(40 40, 30 30, 40 20, 30 10))', + None) ] - + assert_that(result, equal_to(expected_data)) @pytest.mark.it_postcommit @@ -201,50 +217,41 @@ def test_geography_write_with_beam_rows(self): """Test writing GEOGRAPHY data using Beam Rows with GeographyType.""" table_name = 'geography_beam_rows' table_id = '{}.{}'.format(self.dataset_id, table_name) - + # Create Beam Rows with GeographyType row_elements = [ + beam.Row(id=1, location='POINT(1 1)', optional_location='POINT(2 2)'), beam.Row( - id=1, - location='POINT(1 1)', - optional_location='POINT(2 2)' - ), - beam.Row( - id=2, - location='LINESTRING(0 0, 1 1, 2 2)', - optional_location=None - ), + id=2, location='LINESTRING(0 0, 1 1, 2 2)', optional_location=None), beam.Row( id=3, location='POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', - optional_location='POINT(3 3)' - ) + optional_location='POINT(3 3)') ] - + # Expected data for verification - expected_data = [ - (1, 'POINT(1 1)', 'POINT(2 2)'), - (2, 'LINESTRING(0 0, 1 1, 2 2)', None), - (3, 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', 'POINT(3 3)') - ] - + expected_data = [(1, 'POINT(1 1)', 'POINT(2 2)'), + (2, 'LINESTRING(0 0, 1 1, 2 2)', None), + (3, 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))', 'POINT(3 3)')] + pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, - query="SELECT id, location, optional_location FROM %s ORDER BY id" % table_id, + query=( + "SELECT id, location, optional_location FROM %s ORDER BY id" % + table_id), data=expected_data) ] - + args = self.test_pipeline.get_full_options_as_args() - + with beam.Pipeline(argv=args) as p: _ = ( p | 'CreateRows' >> beam.Create(row_elements) | 'WriteToBQ' >> WriteToBigQuery( - table=table_id, - method=WriteToBigQuery.Method.STREAMING_INSERTS)) - + table=table_id, method=WriteToBigQuery.Method.STREAMING_INSERTS)) + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) @pytest.mark.it_postcommit @@ -252,7 +259,7 @@ def test_geography_repeated_fields(self): """Test GEOGRAPHY fields with REPEATED mode.""" table_name = 'geography_repeated' table_id = '{}.{}'.format(self.dataset_id, table_name) - + input_data = [ { 'id': 1, @@ -273,16 +280,22 @@ def test_geography_repeated_fields(self): 'path': [] # Empty array } ] - + table_schema = { - "fields": [ - {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, - {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"}, - {"name": "path", "type": "GEOGRAPHY", "mode": "REPEATED"} - ] + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }, { + "name": "path", "type": "GEOGRAPHY", "mode": "REPEATED" + }] } - + # Write data with TestPipeline(is_integration_test=True) as p: _ = ( @@ -292,17 +305,16 @@ def test_geography_repeated_fields(self): table=table_id, schema=table_schema, method=WriteToBigQuery.Method.STREAMING_INSERTS)) - + # Read and verify with TestPipeline(is_integration_test=True) as p: result = ( p | 'ReadFromBQ' >> ReadFromBigQuery( - table=table_id, - method=ReadFromBigQuery.Method.DIRECT_READ) + table=table_id, method=ReadFromBigQuery.Method.DIRECT_READ) | 'ExtractData' >> beam.Map( lambda row: (row['id'], len(row['path']) if row['path'] else 0))) - + expected_counts = [(1, 3), (2, 2), (3, 0)] assert_that(result, equal_to(expected_counts)) @@ -311,49 +323,71 @@ def test_geography_complex_geometries(self): """Test complex GEOGRAPHY geometries and edge cases.""" table_name = 'geography_complex' table_id = '{}.{}'.format(self.dataset_id, table_name) - + # Complex geometries including collections and high precision coordinates input_data = [ { 'id': 1, - 'location': 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))', + 'location': ( + 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))'), 'optional_location': None }, { 'id': 2, - 'location': 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ((2 2, 2 3, 3 3, 3 2, 2 2)))', - 'optional_location': 'POINT(-122.419416 37.774929)' # High precision + 'location': ( + 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ' + '((2 2, 2 3, 3 3, 3 2, 2 2)))'), + 'optional_location': ('POINT(-122.419416 37.774929)' + ) # High precision }, { 'id': 3, - 'location': 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), (1 1, 1 9, 9 9, 9 1, 1 1))', # With hole - 'optional_location': 'LINESTRING(-180 -90, 180 90)' # Extreme coordinates + 'location': ( + 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), ' + '(1 1, 1 9, 9 9, 9 1, 1 1))'), # With hole + 'optional_location': ('LINESTRING(-180 -90, 180 90)' + ) # Extreme coordinates } ] - + table_schema = { - "fields": [ - {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, - {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} - ] + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] } - + expected_data = [ (1, 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))', None), - (2, 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ((2 2, 2 3, 3 3, 3 2, 2 2)))', 'POINT(-122.419416 37.774929)'), - (3, 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), (1 1, 1 9, 9 9, 9 1, 1 1))', 'LINESTRING(-180 -90, 180 90)') + ( + 2, + 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ' + '((2 2, 2 3, 3 3, 3 2, 2 2)))', + 'POINT(-122.419416 37.774929)'), + ( + 3, + 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), ' + '(1 1, 1 9, 9 9, 9 1, 1 1))', + 'LINESTRING(-180 -90, 180 90)') ] - + pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, - query="SELECT id, location, optional_location FROM %s ORDER BY id" % table_id, + query=( + "SELECT id, location, optional_location FROM %s ORDER BY id" % + table_id), data=expected_data) ] - + args = self.test_pipeline.get_full_options_as_args() - + with beam.Pipeline(argv=args) as p: _ = ( p @@ -362,7 +396,7 @@ def test_geography_complex_geometries(self): table=table_id, schema=table_schema, method=WriteToBigQuery.Method.STREAMING_INSERTS)) - + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) @pytest.mark.it_postcommit @@ -370,42 +404,43 @@ def test_geography_storage_write_api(self): """Test GEOGRAPHY with Storage Write API method.""" table_name = 'geography_storage_write' table_id = '{}.{}'.format(self.dataset_id, table_name) - - input_data = [ - { - 'id': 1, - 'location': 'POINT(0 0)', - 'optional_location': 'POINT(1 1)' - }, - { - 'id': 2, - 'location': 'LINESTRING(0 0, 1 1)', - 'optional_location': None - } - ] - + + input_data = [{ + 'id': 1, 'location': 'POINT(0 0)', 'optional_location': 'POINT(1 1)' + }, + { + 'id': 2, + 'location': 'LINESTRING(0 0, 1 1)', + 'optional_location': None + }] + table_schema = { - "fields": [ - {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, - {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} - ] + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] } - - expected_data = [ - (1, 'POINT(0 0)', 'POINT(1 1)'), - (2, 'LINESTRING(0 0, 1 1)', None) - ] - + + expected_data = [(1, 'POINT(0 0)', 'POINT(1 1)'), + (2, 'LINESTRING(0 0, 1 1)', None)] + pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, - query="SELECT id, location, optional_location FROM %s ORDER BY id" % table_id, + query=( + "SELECT id, location, optional_location FROM %s ORDER BY id" % + table_id), data=expected_data) ] - + args = self.test_pipeline.get_full_options_as_args() - + with beam.Pipeline(argv=args) as p: _ = ( p @@ -414,7 +449,7 @@ def test_geography_storage_write_api(self): table=table_id, schema=table_schema, method=WriteToBigQuery.Method.STORAGE_WRITE_API)) - + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) @pytest.mark.it_postcommit @@ -422,34 +457,39 @@ def test_geography_file_loads_method(self): """Test GEOGRAPHY with FILE_LOADS method.""" table_name = 'geography_file_loads' table_id = '{}.{}'.format(self.dataset_id, table_name) - + input_data = [ { 'id': i, 'location': f'POINT({i} {i})', - 'optional_location': f'POINT({i+10} {i+10})' if i % 2 == 0 else None - } - for i in range(1, 11) # 10 records + 'optional_location': ( + f'POINT({i+10} {i+10})' if i % 2 == 0 else None) + } for i in range(1, 11) # 10 records ] - + table_schema = { - "fields": [ - {"name": "id", "type": "INTEGER", "mode": "REQUIRED"}, - {"name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED"}, - {"name": "optional_location", "type": "GEOGRAPHY", "mode": "NULLABLE"} - ] + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, { + "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] } - + # Verify count and some sample data pipeline_verifiers = [ BigqueryFullResultMatcher( project=self.project, query="SELECT COUNT(*) as count FROM %s" % table_id, - data=[(10,)]) + data=[(10, )]) ] - + args = self.test_pipeline.get_full_options_as_args() - + with beam.Pipeline(argv=args) as p: _ = ( p @@ -458,10 +498,10 @@ def test_geography_file_loads_method(self): table=table_id, schema=table_schema, method=WriteToBigQuery.Method.FILE_LOADS)) - + hc.assert_that(p, hc.all_of(*pipeline_verifiers)) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) - unittest.main() \ No newline at end of file + unittest.main() From a1995ee0e4f3bede8b91a938d8163e0fc6f04bd4 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 16:29:45 -0400 Subject: [PATCH 10/15] feat(bigquery): add project handling and test improvements - Add _get_project method to handle project billing in BigQuery source - Update tests to explicitly specify project parameter - Fix geography test data formats and simplify test cases - Add temporary storage location for file load tests --- sdks/python/apache_beam/io/gcp/bigquery.py | 13 +++ .../io/gcp/bigquery_geography_it_test.py | 95 +++++++++++++------ 2 files changed, 78 insertions(+), 30 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 4780f948be23..d1a522a6f8be 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1028,6 +1028,16 @@ def __init__( self._step_name = step_name self._source_uuid = unique_id + def _get_project(self): + """Returns the project that queries and exports will be billed to.""" + if self.pipeline_options: + project = self.pipeline_options.view_as(GoogleCloudOptions).project + if isinstance(project, vp.ValueProvider): + project = project.get() + if project: + return project + return self.project + def _get_parent_project(self): """Returns the project that will be billed.""" if self.temp_table: @@ -1160,6 +1170,9 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self._setup_temporary_dataset(bq) self.table_reference = self._execute_query(bq) + if not self.table_reference.projectId: + self.table_reference.projectId = self._get_project() + requested_session = bq_storage.types.ReadSession() requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format( self.table_reference.projectId, diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index 011fdc32b591..d011972fbeaa 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -183,14 +183,17 @@ def test_geography_write_and_read_basic_geometries(self): | 'WriteToBQ' >> WriteToBigQuery( table=table_id, schema=table_schema, - method=WriteToBigQuery.Method.STREAMING_INSERTS)) + method=WriteToBigQuery.Method.STREAMING_INSERTS, + project=self.project)) # Read data back and verify with TestPipeline(is_integration_test=True) as p: result = ( p | 'ReadFromBQ' >> ReadFromBigQuery( - table=table_id, method=ReadFromBigQuery.Method.DIRECT_READ) + table=table_id, + project=self.project, + method=ReadFromBigQuery.Method.DIRECT_READ) | 'ExtractGeography' >> beam.Map( lambda row: (row['id'], row['location'], row['optional_location']))) @@ -202,7 +205,7 @@ def test_geography_write_and_read_basic_geometries(self): 3, 'POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), - (4, 'MULTIPOINT((10 40), (40 30), (20 20), (30 10))', 'POINT(0 0)'), + (4, 'MULTIPOINT(20 20, 10 40, 40 30, 30 10)', 'POINT(0 0)'), ( 5, 'MULTILINESTRING((10 10, 20 20, 10 40), ' @@ -218,6 +221,9 @@ def test_geography_write_with_beam_rows(self): table_name = 'geography_beam_rows' table_id = '{}.{}'.format(self.dataset_id, table_name) + # Create the table first + self.create_geography_table(table_name) + # Create Beam Rows with GeographyType row_elements = [ beam.Row(id=1, location='POINT(1 1)', optional_location='POINT(2 2)'), @@ -249,10 +255,36 @@ def test_geography_write_with_beam_rows(self): _ = ( p | 'CreateRows' >> beam.Create(row_elements) + | 'ConvertToDict' >> beam.Map( + lambda row: { + 'id': row.id, 'location': row.location, + 'optional_location': row.optional_location + }) | 'WriteToBQ' >> WriteToBigQuery( - table=table_id, method=WriteToBigQuery.Method.STREAMING_INSERTS)) - - hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + table=table_id, + method=WriteToBigQuery.Method.STREAMING_INSERTS, + schema={ + "fields": [{ + "name": "id", "type": "INTEGER", "mode": "REQUIRED" + }, + { + "name": "location", + "type": "GEOGRAPHY", + "mode": "REQUIRED" + }, + { + "name": "optional_location", + "type": "GEOGRAPHY", + "mode": "NULLABLE" + }] + })) + + # Wait a bit for streaming inserts to complete + import time + time.sleep(5) + + # Verify the data was written correctly + hc.assert_that(None, hc.all_of(*pipeline_verifiers)) @pytest.mark.it_postcommit def test_geography_repeated_fields(self): @@ -297,7 +329,8 @@ def test_geography_repeated_fields(self): } # Write data - with TestPipeline(is_integration_test=True) as p: + args = self.test_pipeline.get_full_options_as_args() + with beam.Pipeline(argv=args) as p: _ = ( p | 'CreateData' >> beam.Create(input_data) @@ -307,11 +340,13 @@ def test_geography_repeated_fields(self): method=WriteToBigQuery.Method.STREAMING_INSERTS)) # Read and verify - with TestPipeline(is_integration_test=True) as p: + with beam.Pipeline(argv=args) as p: result = ( p | 'ReadFromBQ' >> ReadFromBigQuery( - table=table_id, method=ReadFromBigQuery.Method.DIRECT_READ) + table=table_id, + method=ReadFromBigQuery.Method.DIRECT_READ, + project=self.project) | 'ExtractData' >> beam.Map( lambda row: (row['id'], len(row['path']) if row['path'] else 0))) @@ -335,18 +370,17 @@ def test_geography_complex_geometries(self): { 'id': 2, 'location': ( - 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ' - '((2 2, 2 3, 3 3, 3 2, 2 2)))'), + 'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ' + '((2 2, 3 2, 3 3, 2 3, 2 2)))'), # Fixed orientation 'optional_location': ('POINT(-122.419416 37.774929)' ) # High precision }, { 'id': 3, - 'location': ( - 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), ' - '(1 1, 1 9, 9 9, 9 1, 1 1))'), # With hole - 'optional_location': ('LINESTRING(-180 -90, 180 90)' - ) # Extreme coordinates + 'location': ('POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))' + ), # Simple polygon without holes + 'optional_location': ('LINESTRING(-122 37, -121 38)' + ) # Fixed non-antipodal coordinates } ] @@ -363,19 +397,16 @@ def test_geography_complex_geometries(self): }] } - expected_data = [ - (1, 'GEOMETRYCOLLECTION(POINT(4 6), LINESTRING(4 6, 7 10))', None), - ( - 2, - 'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)), ' - '((2 2, 2 3, 3 3, 3 2, 2 2)))', - 'POINT(-122.419416 37.774929)'), - ( - 3, - 'POLYGON((0 0, 0 10, 10 10, 10 0, 0 0), ' - '(1 1, 1 9, 9 9, 9 1, 1 1))', - 'LINESTRING(-180 -90, 180 90)') - ] + expected_data = [(1, 'LINESTRING(4 6, 7 10)', None), + ( + 2, + 'MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ' + '((2 2, 3 2, 3 3, 2 3, 2 2)))', + 'POINT(-122.419416 37.774929)'), + ( + 3, + 'POLYGON((0 0, 0 5, 5 5, 5 0, 0 0))', + 'LINESTRING(-122 37, -121 38)')] pipeline_verifiers = [ BigqueryFullResultMatcher( @@ -489,6 +520,9 @@ def test_geography_file_loads_method(self): ] args = self.test_pipeline.get_full_options_as_args() + gcs_temp_location = ( + f'gs://temp-storage-for-end-to-end-tests/' + f'bq_it_test_{int(time.time())}') with beam.Pipeline(argv=args) as p: _ = ( @@ -497,7 +531,8 @@ def test_geography_file_loads_method(self): | 'WriteToBQ' >> WriteToBigQuery( table=table_id, schema=table_schema, - method=WriteToBigQuery.Method.FILE_LOADS)) + method=WriteToBigQuery.Method.FILE_LOADS, + custom_gcs_temp_location=gcs_temp_location)) hc.assert_that(p, hc.all_of(*pipeline_verifiers)) From 7950aeb0723877be2810557477b142fa0e88bc57 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 17:13:21 -0400 Subject: [PATCH 11/15] lint --- sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index d011972fbeaa..b0ce0005bd75 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -37,7 +37,6 @@ from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to -from apache_beam.typehints.schemas import GeographyType try: from apitools.base.py.exceptions import HttpError @@ -280,7 +279,6 @@ def test_geography_write_with_beam_rows(self): })) # Wait a bit for streaming inserts to complete - import time time.sleep(5) # Verify the data was written correctly From d0c8bc805ac8d3c541daf3164255e55c14919485 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 13 Sep 2025 18:58:42 -0400 Subject: [PATCH 12/15] format --- sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index b0ce0005bd75..ebd642ad8131 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -28,12 +28,11 @@ import pytest import apache_beam as beam -from apache_beam.io.gcp.bigquery import WriteToBigQuery from apache_beam.io.gcp.bigquery import ReadFromBigQuery +from apache_beam.io.gcp.bigquery import WriteToBigQuery from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery -from apache_beam.io.gcp.tests.bigquery_matcher import ( - BigqueryFullResultMatcher) +from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to From 8e9bdcb442c375e4a15bfa580300d11e277e6f0b Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 5 Oct 2025 10:09:24 -0400 Subject: [PATCH 13/15] removed GeographyType for now --- sdks/python/apache_beam/typehints/schemas.py | 45 +------ .../apache_beam/typehints/schemas_test.py | 118 ------------------ 2 files changed, 1 insertion(+), 162 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 139bb828e870..38e5cdecfa8d 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -554,12 +554,7 @@ def typing_from_runner_api( else: logical_type_instance = LogicalType.from_runner_api( fieldtype_proto.logical_type) - # Special case for GeographyType: return the logical type class itself - # instead of the language_type to maintain semantic meaning - if fieldtype_proto.logical_type.urn == GeographyType.urn(): - return type(logical_type_instance) - else: - return logical_type_instance.language_type() + return logical_type_instance.language_type() elif type_info == "iterable_type": return Sequence[self.typing_from_runner_api( @@ -1067,45 +1062,7 @@ def _from_typing(cls, typ): return cls() -@LogicalType.register_logical_type -class GeographyType(LogicalType[str, str, str]): - """ - For internal use only; no backwards-compatibility guarantees. - - Support for BigQuery GEOGRAPHY logical type. GEOGRAPHY data type works with - Well-Known Text (WKT) format for reading and writing to BigQuery. - """ - def __init__(self, argument=""): - pass - - @classmethod - def representation_type(cls) -> type: - return str - - @classmethod - def urn(cls): - return "beam:logical_type:geography:v1" - - @classmethod - def language_type(cls): - return str - - def to_representation_type(self, value: str) -> str: - return value - - def to_language_type(self, value: str) -> str: - return value - - @classmethod - def argument_type(cls): - return str - - def argument(self): - return "" - @classmethod - def _from_typing(cls, typ): - return cls() # TODO(yathu,BEAM-10722): Investigate and resolve conflicts in logical type diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py index 329b93de5399..73db06b9a8d2 100644 --- a/sdks/python/apache_beam/typehints/schemas_test.py +++ b/sdks/python/apache_beam/typehints/schemas_test.py @@ -600,124 +600,6 @@ def test_python_callable_maps_to_logical_type(self): schema_registry=SchemaTypeRegistry()), PythonCallableWithSource) - def test_geography_type_maps_to_logical_type(self): - from apache_beam.typehints.schemas import GeographyType - self.assertEqual( - schema_pb2.FieldType( - logical_type=schema_pb2.LogicalType( - urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str), - argument_type=typing_to_runner_api(str), - argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicTypeValue(string="")))), - typing_to_runner_api(GeographyType)) - self.assertEqual( - typing_from_runner_api( - schema_pb2.FieldType( - logical_type=schema_pb2.LogicalType( - urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str), - argument_type=typing_to_runner_api(str), - argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicTypeValue(string="")))), - schema_registry=SchemaTypeRegistry()), - GeographyType) - - def test_optional_geography_type_maps_to_logical_type(self): - from apache_beam.typehints.schemas import GeographyType - self.assertEqual( - schema_pb2.FieldType( - nullable=True, - logical_type=schema_pb2.LogicalType( - urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str), - argument_type=typing_to_runner_api(str), - argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicTypeValue(string="")))), - typing_to_runner_api(Optional[GeographyType])) - self.assertEqual( - typing_from_runner_api( - schema_pb2.FieldType( - nullable=True, - logical_type=schema_pb2.LogicalType( - urn="beam:logical_type:geography:v1", - representation=typing_to_runner_api(str), - argument_type=typing_to_runner_api(str), - argument=schema_pb2.FieldValue( - atomic_value=schema_pb2.AtomicTypeValue(string="")))), - schema_registry=SchemaTypeRegistry()), - Optional[GeographyType]) - - def test_geography_type_instantiation(self): - from apache_beam.typehints.schemas import GeographyType - - # Test basic instantiation - gt = GeographyType() - self.assertEqual(gt.urn(), "beam:logical_type:geography:v1") - self.assertEqual(gt.language_type(), str) - self.assertEqual(gt.representation_type(), str) - self.assertEqual(gt.argument_type(), str) - self.assertEqual(gt.argument(), "") - - # Test instantiation with argument - gt_with_arg = GeographyType("test_arg") - self.assertEqual(gt_with_arg.argument(), "") - - def test_geography_type_conversion_methods(self): - from apache_beam.typehints.schemas import GeographyType - - gt = GeographyType() - - # Test various WKT formats - test_cases = [ - "POINT(30 10)", - "LINESTRING(30 10, 10 30, 40 40)", - "POLYGON((30 10, 40 40, 20 40, 10 20, 30 10))", - "MULTIPOINT((10 40), (40 30), (20 20), (30 10))", - "MULTILINESTRING((10 10, 20 20, 10 40), (40 40, 30 30, 40 20, 30 10))", - "GEOMETRYCOLLECTION(POINT(4 6),LINESTRING(4 6,7 10))" - ] - - for wkt in test_cases: - # Test to_representation_type - result = gt.to_representation_type(wkt) - self.assertEqual(result, wkt) - self.assertIsInstance(result, str) - - # Test to_language_type - result = gt.to_language_type(wkt) - self.assertEqual(result, wkt) - self.assertIsInstance(result, str) - - def test_geography_type_from_typing(self): - from apache_beam.typehints.schemas import GeographyType - - # Test _from_typing class method - gt = GeographyType._from_typing(str) - self.assertIsInstance(gt, GeographyType) - self.assertEqual(gt.urn(), "beam:logical_type:geography:v1") - - def test_geography_type_schema_integration(self): - from apache_beam.typehints.schemas import GeographyType, named_fields_to_schema - - # Test schema creation with GeographyType - fields = [('location', GeographyType), ('name', str)] - schema = named_fields_to_schema(fields) - - self.assertEqual(len(schema.fields), 2) - - # Check geography field - geo_field = schema.fields[0] - self.assertEqual(geo_field.name, 'location') - self.assertTrue(geo_field.type.HasField('logical_type')) - self.assertEqual( - geo_field.type.logical_type.urn, "beam:logical_type:geography:v1") - - # Check string field - str_field = schema.fields[1] - self.assertEqual(str_field.name, 'name') - self.assertEqual(str_field.type.atomic_type, schema_pb2.STRING) - def test_trivial_example(self): MyCuteClass = NamedTuple( 'MyCuteClass', From 17abb6dd60e5252a817ea61d1ca913d2a647e436 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 5 Oct 2025 10:14:02 -0400 Subject: [PATCH 14/15] restore schemas.py --- sdks/python/apache_beam/typehints/schemas.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py index 38e5cdecfa8d..c21dde426fc7 100644 --- a/sdks/python/apache_beam/typehints/schemas.py +++ b/sdks/python/apache_beam/typehints/schemas.py @@ -350,7 +350,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType: try: if LogicalType.is_known_logical_type(type_): - logical_type = type_() + logical_type = type_ else: logical_type = LogicalType.from_typing(type_) except ValueError: @@ -552,9 +552,8 @@ def typing_from_runner_api( if fieldtype_proto.logical_type.urn == PYTHON_ANY_URN: return Any else: - logical_type_instance = LogicalType.from_runner_api( - fieldtype_proto.logical_type) - return logical_type_instance.language_type() + return LogicalType.from_runner_api( + fieldtype_proto.logical_type).language_type() elif type_info == "iterable_type": return Sequence[self.typing_from_runner_api( @@ -1062,9 +1061,6 @@ def _from_typing(cls, typ): return cls() - - - # TODO(yathu,BEAM-10722): Investigate and resolve conflicts in logical type # registration when more than one logical types sharing the same language type LogicalType.register_logical_type(DecimalLogicalType) From 58c4a9c3b768326614797bb269f012e15eec8dfa Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 11 Oct 2025 15:39:01 -0400 Subject: [PATCH 15/15] added uses_gcp_java_expansion_service --- sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py index ebd642ad8131..5a506d3162f9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_geography_it_test.py @@ -427,6 +427,7 @@ def test_geography_complex_geometries(self): hc.assert_that(p, hc.all_of(*pipeline_verifiers)) + @pytest.mark.uses_gcp_java_expansion_service @pytest.mark.it_postcommit def test_geography_storage_write_api(self): """Test GEOGRAPHY with Storage Write API method."""