From 347018b0dc62ff27a48edb29e5232d47f4122d77 Mon Sep 17 00:00:00 2001 From: Nihar Doshi Date: Fri, 19 Jan 2024 11:33:12 +0530 Subject: [PATCH 1/4] support for complex data struct in hive --- .../src/metadata/ingestion/source/database/hive/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ingestion/src/metadata/ingestion/source/database/hive/utils.py b/ingestion/src/metadata/ingestion/source/database/hive/utils.py index f512ce5a6dda..2ef316b5aac7 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/utils.py @@ -26,6 +26,7 @@ "binary": types.BINARY, "char": types.CHAR, "varchar": types.VARCHAR, + "decimal": types.DECIMAL } ) @@ -58,6 +59,9 @@ def get_columns( charlen = re.search(r"\(([\d,]+)\)", col_raw_type.lower()) if charlen: charlen = charlen.group(1) + match = re.search(r"struct", attype) + if match: + attype = match.group(1) if attype == "decimal": prec, scale = charlen.split(",") args = (int(prec), int(scale)) From 064c56518a58b6c3c51181fb401e58361f7c2889 Mon Sep 17 00:00:00 2001 From: Nihar Doshi Date: Fri, 19 Jan 2024 11:34:59 +0530 Subject: [PATCH 2/4] support for complex data struct in hive --- ingestion/src/metadata/ingestion/source/database/hive/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/hive/utils.py b/ingestion/src/metadata/ingestion/source/database/hive/utils.py index 2ef316b5aac7..5f597eca7386 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/utils.py @@ -26,7 +26,7 @@ "binary": types.BINARY, "char": types.CHAR, "varchar": types.VARCHAR, - "decimal": types.DECIMAL + "decimal": types.DECIMAL, } ) From cde20ef8a0407d632d5f1df24d2a4d5c19c43bfa Mon Sep 17 00:00:00 2001 From: Nihar Doshi Date: Fri, 19 Jan 2024 16:03:35 +0530 Subject: [PATCH 3/4] added test case for hive source --- .../ingestion/source/database/hive/utils.py | 4 +- .../tests/unit/topology/database/test_hive.py | 411 ++++++++++++++++++ 2 files changed, 413 insertions(+), 2 deletions(-) create mode 100644 ingestion/tests/unit/topology/database/test_hive.py diff --git a/ingestion/src/metadata/ingestion/source/database/hive/utils.py b/ingestion/src/metadata/ingestion/source/database/hive/utils.py index 5f597eca7386..c99e5ea2fa80 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/utils.py @@ -59,9 +59,9 @@ def get_columns( charlen = re.search(r"\(([\d,]+)\)", col_raw_type.lower()) if charlen: charlen = charlen.group(1) - match = re.search(r"struct", attype) + match = re.search(r"struct<([a-zA-Z0-9]+):([a-zA-Z0-9]+)>", attype) if match: - attype = match.group(1) + attype = match.group(2) if attype == "decimal": prec, scale = charlen.split(",") args = (int(prec), int(scale)) diff --git a/ingestion/tests/unit/topology/database/test_hive.py b/ingestion/tests/unit/topology/database/test_hive.py new file mode 100644 index 000000000000..853a43953c48 --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_hive.py @@ -0,0 +1,411 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test Hive using the topology +""" + +import types +from unittest import TestCase +from unittest.mock import patch + +from sqlalchemy.types import INTEGER, VARCHAR, Integer, String + +import metadata.ingestion.source.database.hive.utils as hive_dialect +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.table import ( + Column, + ColumnName, + DataType, + TableType, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.source.database.hive.metadata import HiveSource + +mock_hive_config = { + "source": { + "type": "hive", + "serviceName": "sample_hive", + "serviceConnection": { + "config": { + "type": "Hive", + "databaseSchema": "test_database_schema", + "username": "username", + "hostPort": "localhost:1466", + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" + "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" + "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" + "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" + "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" + "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +MOCK_DATABASE_SERVICE = DatabaseService( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="hive_source_test", + connection=DatabaseConnection(), + serviceType=DatabaseServiceType.Hive, +) + +MOCK_DATABASE = Database( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + name="sample_database", + fullyQualifiedName="hive_source_test.sample_database", + displayName="sample_database", + description="", + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="databaseService" + ), +) + +MOCK_DATABASE_SCHEMA = DatabaseSchema( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="sample_schema", + fullyQualifiedName="hive_source_test.sample_database.sample_schema", + service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"), + database=EntityReference( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + type="database", + ), +) + +MOCK_COLUMN_VALUE = [ + { + "name": "sample_col_1", + "type": VARCHAR(), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "varchar(50)", + "comment": None, + }, + { + "name": "sample_col_2", + "type": INTEGER(), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "int", + "comment": None, + }, + { + "name": "sample_col_3", + "type": VARCHAR(), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "varchar(50)", + "comment": None, + }, + { + "name": "sample_col_4", + "type": VARCHAR(), + "nullable": True, + "default": None, + "autoincrement": False, + "comment": None, + "system_data_type": "varchar(50)", + }, +] + +EXPECTED_DATABASE = [ + CreateDatabaseRequest( + name=EntityName(__root__="sample_database"), + displayName=None, + description=None, + tags=None, + owner=None, + service=FullyQualifiedEntityName(__root__="hive_source_test"), + dataProducts=None, + default=False, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + +EXPECTED_DATABASE_SCHEMA = [ + CreateDatabaseSchemaRequest( + name=EntityName(__root__="sample_schema"), + displayName=None, + description=None, + owner=None, + database=FullyQualifiedEntityName(__root__="hive_source_test.sample_database"), + dataProducts=None, + tags=None, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + +EXPECTED_TABLE = [ + CreateTableRequest( + name=EntityName(__root__="sample_table"), + displayName=None, + description=None, + tableType=TableType.Regular.name, + columns=[ + Column( + name=ColumnName(__root__="sample_col_1"), + displayName=None, + dataType=DataType.VARCHAR.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="varchar(50)", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name=ColumnName(__root__="sample_col_2"), + displayName=None, + dataType=DataType.INT.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="int", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name=ColumnName(__root__="sample_col_3"), + displayName=None, + dataType=DataType.VARCHAR.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="varchar(50)", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name=ColumnName(__root__="sample_col_4"), + displayName=None, + dataType=DataType.VARCHAR.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="varchar(50)", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + ], + tableConstraints=[], + tablePartition=None, + tableProfilerConfig=None, + owner=None, + databaseSchema=FullyQualifiedEntityName( + __root__="hive_source_test.sample_database.sample_schema" + ), + tags=None, + viewDefinition=None, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + dataProducts=None, + fileFormat=None, + lifeCycle=None, + sourceHash=None, + ) +] + +EXPECTED_COMPLEX_COL_TYPE = [ + { + "name": "id", + "type": Integer, + "comment": None, + "nullable": True, + "default": None, + "system_data_type": "int", + "is_complex": False, + }, + { + "name": "data", + "type": String(length=20, collation=0), + "comment": None, + "nullable": True, + "default": None, + "system_data_type": "struct>", + "is_complex": True, + }, + { + "name": "data2", + "type": String(length=20, collation=0), + "comment": None, + "nullable": True, + "default": None, + "system_data_type": "struct", + "is_complex": True, + }, +] + + +class HiveUnitTest(TestCase): + """ + Implements the necessary methods to extract + Hive Unit Test + """ + + @patch( + "metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection" + ) + def __init__( + self, + methodName, + test_connection, + ) -> None: + super().__init__(methodName) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.parse_obj(mock_hive_config) + self.hive = HiveSource.create( + mock_hive_config["source"], + self.config.workflowConfig.openMetadataServerConfig, + ) + self.hive.context.__dict__[ + "database_service" + ] = MOCK_DATABASE_SERVICE.name.__root__ + self.hive.inspector = types.SimpleNamespace() + + self.hive.inspector.get_pk_constraint = lambda table_name, schema_name: [] + self.hive.inspector.get_unique_constraints = lambda table_name, schema_name: [] + self.hive.inspector.get_foreign_keys = lambda table_name, schema_name: [] + + def test_yield_database(self): + assert EXPECTED_DATABASE == [ + either.right for either in self.hive.yield_database(MOCK_DATABASE.name) + ] + + self.hive.context.__dict__[ + "database_service" + ] = MOCK_DATABASE_SERVICE.name.__root__ + self.hive.context.__dict__["database"] = MOCK_DATABASE.name.__root__ + + def test_yield_schema(self): + assert EXPECTED_DATABASE_SCHEMA == [ + either.right + for either in self.hive.yield_database_schema( + schema_name=MOCK_DATABASE_SCHEMA.name.__root__ + ) + ] + + self.hive.context.__dict__[ + "database_schema" + ] = MOCK_DATABASE_SCHEMA.name.__root__ + + def test_yield_table(self): + self.hive.inspector.get_columns = ( + lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE + ) + assert EXPECTED_TABLE == [ + either.right + for either in self.hive.yield_table(("sample_table", "Regular")) + ] + + def test_col_data_type(self): + """ + Test different col type ingested as expected + """ + table_columns = [ + ("id", "int", ""), + ("data", "struct>", ""), + ("data2", "struct", ""), + ] + hive_dialect._get_table_columns = ( # pylint: disable=protected-access + lambda connection, table_name, schema_name: table_columns + ) + + col_list = list( + hive_dialect.get_columns( + self=hive_dialect, + connection=mock_hive_config["source"], + table_name="sample_table", + schema="sample_schema", + ) + ) + for _, (expected, original) in enumerate( + zip(EXPECTED_COMPLEX_COL_TYPE, col_list) + ): + + def custom_eq(self, __value: object) -> bool: + return ( + self.length == __value.length + and self.collation == __value.collation + ) + + String.__eq__ = custom_eq + self.assertEqual(expected, original) From 3582c3458535dde6d38ff70a41c09dfde886c267 Mon Sep 17 00:00:00 2001 From: Nihar Doshi Date: Fri, 19 Jan 2024 16:53:34 +0530 Subject: [PATCH 4/4] added test case for hive source --- .../src/metadata/ingestion/source/database/hive/utils.py | 5 ++--- ingestion/tests/unit/topology/database/test_hive.py | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/hive/utils.py b/ingestion/src/metadata/ingestion/source/database/hive/utils.py index c99e5ea2fa80..c7f37b7a9c35 100644 --- a/ingestion/src/metadata/ingestion/source/database/hive/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/hive/utils.py @@ -59,12 +59,11 @@ def get_columns( charlen = re.search(r"\(([\d,]+)\)", col_raw_type.lower()) if charlen: charlen = charlen.group(1) - match = re.search(r"struct<([a-zA-Z0-9]+):([a-zA-Z0-9]+)>", attype) - if match: - attype = match.group(2) if attype == "decimal": prec, scale = charlen.split(",") args = (int(prec), int(scale)) + elif attype.startswith("struct"): + args = [] else: args = (int(charlen),) coltype = coltype(*args) diff --git a/ingestion/tests/unit/topology/database/test_hive.py b/ingestion/tests/unit/topology/database/test_hive.py index 853a43953c48..8ed0ebc3521b 100644 --- a/ingestion/tests/unit/topology/database/test_hive.py +++ b/ingestion/tests/unit/topology/database/test_hive.py @@ -296,7 +296,7 @@ }, { "name": "data", - "type": String(length=20, collation=0), + "type": String(), "comment": None, "nullable": True, "default": None, @@ -305,7 +305,7 @@ }, { "name": "data2", - "type": String(length=20, collation=0), + "type": String(), "comment": None, "nullable": True, "default": None,