Skip to content

Commit

Permalink
Fixes #5448: Implement initial Iceberg Connector using PyIceberg (#14825
Browse files Browse the repository at this point in the history
)

* Create the iceberg connection schema

* Link the IcebergConnection configuration with the forms on the UI

* Add the pyiceberg dependency on the ingestion package

* Create the get_connection and test_connection functions

* First iteration on the iceberg ingestion logic

* Add A more comprehensive implementation of the Iceberg Source

* Add UnitTests

* Update icebergConnection definition

* Update the iceberg souce code based on new schema

* Updated icebergConnecgtion schema for simplicity and to be able to configure Converters

* Updated setup dependencies to be more flexible

* Updated get_owner_ref logic

* Fix formatting

* Changed the icebergConnection json schema structure to enable the ClassConverters

* Add the IcebergCatalog and IcebergFileSystem ClassConverters

* Refactor the code to take into account the new jsonSchema structure

* Fix formatting

* Add Documentation for the Iceberg Connector

* Fix Menu order for Iceberg

* ui: add Iceberg service icon and constant

* Fix DynamoDb Catalog issue due to how PyIceberg instantes it

* Changed uri title to URI

* Fix ClassConverter for Iceberg

* Fix GetSecretValue for password types

* Fix formatting

* Fix formatting

* Add Iceberg Connector Images for the docs

* Add pylint disable for Hacky super() call

* Add Iceberg.md for the UI docs

* Fix pylint complaint

* Fix pylint complaint

* Fix UnitTests

* fix type error and unit tests

* update pipeline type checks

* Fix Sonar Cloud complaints

---------

Co-authored-by: Sachin Chaurasiya <sachinchaurasiyachotey87@gmail.com>
  • Loading branch information
IceS2 and Sachin-chaurasiya authored Jan 29, 2024
1 parent 90dc3e8 commit 373cafc
Show file tree
Hide file tree
Showing 42 changed files with 3,480 additions and 19 deletions.
17 changes: 14 additions & 3 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
# Add here versions required for multiple plugins
VERSIONS = {
"airflow": "apache-airflow==2.7.3",
"adlfs": "adlfs~=2022.11",
"avro": "avro~=1.11",
"boto3": "boto3>=1.20,<2.0", # No need to add botocore separately. It's a dep from boto3
"geoalchemy2": "GeoAlchemy2~=0.12",
"google-cloud-storage": "google-cloud-storage==1.43.0",
"gcsfs": "gcsfs~=2022.11",
"great-expectations": "great-expectations~=0.18.0",
"grpc-tools": "grpcio-tools>=1.47.2",
"msal": "msal~=1.2",
"neo4j": "neo4j~=5.3.0",
"pandas": "pandas<=2,<3",
"pyarrow": "pyarrow~=14.0",
"pydantic": "pydantic~=1.10",
"pydomo": "pydomo~=0.3",
"pymysql": "pymysql>=1.0.2",
"pyodbc": "pyodbc>=4.0.35,<5",
Expand Down Expand Up @@ -106,7 +109,7 @@
"jsonschema",
"memory-profiler",
"mypy_extensions>=0.4.3",
"pydantic~=1.10",
VERSIONS["pydantic"],
VERSIONS["pymysql"],
"python-dateutil>=2.8.1",
"python-jose~=3.3",
Expand Down Expand Up @@ -161,12 +164,12 @@
"datalake-azure": {
VERSIONS["azure-storage-blob"],
VERSIONS["azure-identity"],
"adlfs>=2022.2.0", # Python 3.7 does only support up to 2022.2.0
VERSIONS["adlfs"], # Python 3.7 does only support up to 2022.2.0
*COMMONS["datalake"],
},
"datalake-gcs": {
VERSIONS["google-cloud-storage"],
"gcsfs==2022.11.0",
VERSIONS["gcsfs"],
*COMMONS["datalake"],
},
"datalake-s3": {
Expand Down Expand Up @@ -197,6 +200,14 @@
"thrift-sasl~=0.4",
"impyla~=0.18.0",
},
"iceberg": {
"pyiceberg",
# Forcing the version of a few packages so it plays nicely with other requirements.
VERSIONS["pydantic"],
VERSIONS["adlfs"],
VERSIONS["gcsfs"],
VERSIONS["pyarrow"],
},
"impala": {
"presto-types-parser>=0.0.2",
"impyla[kerberos]~=0.18.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Catalog Factory.
"""
from typing import Dict, Type

from pyiceberg.catalog import Catalog

from metadata.generated.schema.entity.services.connections.database.iceberg.dynamoDbCatalogConnection import (
DynamoDbCatalogConnection,
)
from metadata.generated.schema.entity.services.connections.database.iceberg.glueCatalogConnection import (
GlueCatalogConnection,
)
from metadata.generated.schema.entity.services.connections.database.iceberg.hiveCatalogConnection import (
HiveCatalogConnection,
)
from metadata.generated.schema.entity.services.connections.database.iceberg.icebergCatalog import (
IcebergCatalog,
)
from metadata.generated.schema.entity.services.connections.database.iceberg.restCatalogConnection import (
RestCatalogConnection,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase
from metadata.ingestion.source.database.iceberg.catalog.dynamodb import (
IcebergDynamoDbCatalog,
)
from metadata.ingestion.source.database.iceberg.catalog.glue import IcebergGlueCatalog
from metadata.ingestion.source.database.iceberg.catalog.hive import IcebergHiveCatalog
from metadata.ingestion.source.database.iceberg.catalog.rest import IcebergRestCatalog


class IcebergCatalogFactory:
"""Factory Class to get any PyIceberg implemented Catalog."""

catalog_type_map: Dict[str, Type[IcebergCatalogBase]] = {
RestCatalogConnection.__name__: IcebergRestCatalog,
HiveCatalogConnection.__name__: IcebergHiveCatalog,
GlueCatalogConnection.__name__: IcebergGlueCatalog,
DynamoDbCatalogConnection.__name__: IcebergDynamoDbCatalog,
}

@classmethod
def from_connection(cls, catalog: IcebergCatalog) -> Catalog:
"""Returns a PyIceberg Catalog from the given catalog configuration."""
catalog_type = cls.catalog_type_map.get(
catalog.connection.__class__.__name__, None
)

if not catalog_type:
raise NotImplementedError(
f"Iceberg Catalog of type ['{catalog.connection.__class__.__name__}'] Not Implemmented."
)

return catalog_type.get_catalog(catalog)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Catalog base module.
"""
from abc import ABC, abstractmethod
from typing import Optional

from pyiceberg.catalog import Catalog

from metadata.generated.schema.entity.services.connections.database.iceberg.icebergCatalog import (
IcebergCatalog,
)
from metadata.ingestion.source.database.iceberg.fs import (
FileSystemConfig,
IcebergFileSystemFactory,
)


class IcebergCatalogBase(ABC):
@classmethod
@abstractmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
"""Returns a Catalog for given catalog configuration."""

@staticmethod
def get_fs_parameters(fs_config: Optional[FileSystemConfig]) -> dict:
"""Gets the FileSystem parameters based on passed configuration."""
if not fs_config:
return {}
return IcebergFileSystemFactory.parse(fs_config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg DynamoDB Catalog
"""
import boto3
from pyiceberg.catalog import Catalog
from pyiceberg.catalog.dynamodb import DynamoDbCatalog

from metadata.generated.schema.entity.services.connections.database.iceberg.dynamoDbCatalogConnection import (
DynamoDbCatalogConnection,
)
from metadata.generated.schema.entity.services.connections.database.iceberg.icebergCatalog import (
IcebergCatalog,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase


class CustomDynamoDbCatalog(DynamoDbCatalog):
"""Custom DynamoDb Catalog implementation to override the PyIceberg one.
This is needed due to PyIceberg not handling the __init__ method correctly by
instantiating the Boto3 Client without the correct configuration and having side
effects on it.
"""

@staticmethod
def get_boto3_client(parameters: dict):
"""
Overrides the boto3 client created by PyIceberg.
PyIceberg doesn't handle the Boto3 Session.
"""
boto_session_config_keys = [
"aws_access_key_id",
"aws_secret_access_key",
"aws_session_token",
"region_name",
"profile_name",
]

session_config = {
k: v for k, v in parameters.items() if k in boto_session_config_keys
}

session = boto3.Session(**session_config)
return session.client("dynamodb")

def __init__(self, name: str, **properties: str):
# HACK: Runs Catalog.__init__ without running DynamoDbCatalog.__init__
super(DynamoDbCatalog, self).__init__( # pylint: disable=E1003,I0021
name, **properties
)

self.dynamodb = self.get_boto3_client(properties)
self.dynamodb_table_name = self.properties.get("table-name", "iceberg")


class IcebergDynamoDbCatalog(IcebergCatalogBase):
"""Responsible for building a PyIceberg DynamoDB Catalog."""

@classmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
"""Returns a DynamoDB Catalog for the given connection and file storage.
For more information, check the PyIceberg [docs](https://py.iceberg.apache.org/configuration/#dynamodb-catalog)
"""
if not isinstance(catalog.connection, DynamoDbCatalogConnection):
raise RuntimeError(
"'connection' is not an instance of 'DynamoDbCatalogConnection'"
)

parameters = {"warehouse": catalog.warehouseLocation}

if catalog.connection.tableName:
parameters = {"table-name": catalog.connection.tableName}

if catalog.connection.awsConfig:
aws_config = catalog.connection.awsConfig

parameters = {
**parameters,
"aws_secret_key_id": aws_config.awsAccessKeyId,
"aws_secret_access_key": aws_config.awsSecretAccessKey
if aws_config.awsSecretAccessKey
else None,
"aws_session_token": aws_config.awsSessionToken,
"region_name": aws_config.awsRegion,
"profile_name": aws_config.profileName,
# Needed because the way PyIceberg instantiates the PyArrowFileIO
# is different from how they instantiate the Boto3 Client.
**cls.get_fs_parameters(aws_config),
}

return CustomDynamoDbCatalog(catalog.name, **parameters)
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Iceberg Glue Catalog
"""
import boto3
from pyiceberg.catalog import Catalog
from pyiceberg.catalog.glue import GlueCatalog

from metadata.generated.schema.entity.services.connections.database.iceberg.glueCatalogConnection import (
GlueCatalogConnection,
)
from metadata.generated.schema.entity.services.connections.database.iceberg.icebergCatalog import (
IcebergCatalog,
)
from metadata.ingestion.source.database.iceberg.catalog.base import IcebergCatalogBase


class IcebergGlueCatalog(IcebergCatalogBase):
"""Responsible for building a PyIceberg Glue Catalog."""

@staticmethod
def override_boto3_glue_client(catalog: GlueCatalog, parameters: dict):
"""
Overrides the boto3 client created by PyIceberg.
This is needed because PyIceberg 0.4.0 has a bug
where they try to pass the 'aws_access_key_id' as 'aws_secret_key_id' and
it breaks.
"""
boto_session_config_keys = [
"aws_access_key_id",
"aws_secret_access_key",
"aws_session_token",
"region_name",
"profile_name",
]

session_config = {
k: v for k, v in parameters.items() if k in boto_session_config_keys
}
session = boto3.Session(**session_config)
catalog.glue = session.client("glue")

@classmethod
def get_catalog(cls, catalog: IcebergCatalog) -> Catalog:
"""Returns a Glue Catalog for the given connection and file storage.
For more information, check the PyIceberg [docs](https://py.iceberg.apache.org/configuration/#glue-catalog)
"""
if not isinstance(catalog.connection, GlueCatalogConnection):
raise RuntimeError(
"'connection' is not an instance of 'GlueCatalogConnection'"
)

parameters = {"warehouse": catalog.warehouseLocation}

if catalog.connection.awsConfig:
aws_config = catalog.connection.awsConfig

parameters = {
**parameters,
"aws_access_key_id": aws_config.awsAccessKeyId,
"aws_secret_access_key": aws_config.awsSecretAccessKey.get_secret_value()
if aws_config.awsSecretAccessKey
else None,
"aws_session_token": aws_config.awsSessionToken,
"region_name": aws_config.awsRegion,
"profile_name": aws_config.profileName,
# Needed because the way PyIceberg instantiates the PyArrowFileIO
# is different from how they instantiate the Boto3 Client.
**cls.get_fs_parameters(aws_config),
}

glue_catalog: GlueCatalog = GlueCatalog(catalog.name, **parameters)

# HACK: Overriding the Boto3 Glue client due to PyIceberg 0.4.0 Bug.
cls.override_boto3_glue_client(glue_catalog, parameters)

return glue_catalog
Loading

0 comments on commit 373cafc

Please sign in to comment.