Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Add Iceberg source #5010

Merged
merged 57 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
15bfcdf
Iceberg source: Initial draft to gather review
cccs-eric Jan 28, 2022
62cea0f
Fixes from code review
cccs-eric Jan 29, 2022
d660f13
Linting fixes
cccs-eric Jan 29, 2022
b3eae50
Iceberg source: abstracting datalake walk process
cccs-eric Feb 3, 2022
fc1ad6c
Iceberg source: Initial draft to gather review
cccs-eric Jan 28, 2022
f52fcf4
Fixes from code review
cccs-eric Jan 29, 2022
0fe7e60
Linting fixes
cccs-eric Jan 29, 2022
4eac4c8
Iceberg source: abstracting datalake walk process
cccs-eric Feb 3, 2022
910752c
Iceberg source: Initial draft to gather review
cccs-eric Jan 28, 2022
16d4a1c
Fixes from code review
cccs-eric Jan 29, 2022
bef7db3
Linting fixes
cccs-eric Jan 29, 2022
bfb305b
Iceberg source: abstracting datalake walk process
cccs-eric Feb 3, 2022
c05615b
Add test cases for Iceberg source
cccs-eric Mar 16, 2022
3c3e950
Merge branch 'feature/iceberg_source' of github.com:CybercentreCanada…
cccs-eric Mar 16, 2022
7c4b546
Change how to handle nullables
cccs-eric Mar 16, 2022
6c9cd29
Convert variable names to snakecase
cccs-eric Mar 16, 2022
a26e93c
Merge remote-tracking branch 'origin/master' into feature/iceberg_source
cccs-eric Mar 18, 2022
e7d19d1
Add iceberg logo and register data platform.
cccs-eric Mar 20, 2022
87f74d0
Add unit and integration test for Iceberg
cccs-eric Apr 4, 2022
2e80c17
Refactor how iceberg profiling is done
cccs-eric Apr 22, 2022
7101156
add acryl-iceberg-legacy dependency
cccs-eric Apr 26, 2022
6113090
enable all unit test and add descriptions about failures.
cccs-eric Apr 27, 2022
938689c
Merge branch 'master' into feature/iceberg_source
cccs-eric Apr 27, 2022
6f325aa
Update to acryl-iceberg-legacy 0.0.3
cccs-eric Apr 27, 2022
e35f067
ensure CI only activates iceberg for 3.7+
shirshanka Apr 29, 2022
c0655be
remove iceberg from base list
shirshanka Apr 29, 2022
a06ee61
Disable iceberg integration test until avro bug is resolved
cccs-eric Apr 29, 2022
3c06e2c
fix lint issue
cccs-eric Apr 29, 2022
c6e1dbe
skip test when iceberg package not available
cccs-eric Apr 30, 2022
3a70f35
skip test when iceberg package not available
cccs-eric Apr 30, 2022
b995614
fix for iceberg error not finding field and add profiling test
cccs-eric May 4, 2022
4530f15
Merge branch 'master' into feature/iceberg_source
cccs-eric May 4, 2022
16bc529
improve assertion messages
cccs-eric May 4, 2022
1fa32d2
improve assertion messages
cccs-eric May 4, 2022
a5bb71b
add source documentation
cccs-eric May 5, 2022
91265f3
Merge branch 'master' into feature/iceberg_source
cccs-eric May 5, 2022
12ecdf2
Iceberg: Populate ArrayType's nestedType & Map's key&value types in t…
rslanka May 10, 2022
2f978b5
Merge pull request #1 from datahub-project/feature/iceberg_source
cccs-eric May 10, 2022
f408559
add extra unit tests for map and array nested types
cccs-eric May 10, 2022
fade9d7
Iceberg: Use logical type in the contained types for ArrayTypeClass &…
rslanka May 11, 2022
299a7f8
Merge pull request #2 from datahub-project/feature/iceberg_source
cccs-eric May 12, 2022
29a93e8
move main code out of init.py to iceberg.py
cccs-eric May 13, 2022
d4947b4
add support for ADLS client secret authentication
cccs-eric May 13, 2022
3feb7de
Code Review changes.
rslanka May 13, 2022
f10130c
Merge pull request #3 from datahub-project/feature/iceberg_source
cccs-eric May 16, 2022
d5e68c0
fixes from code review
cccs-eric May 16, 2022
41472fa
Merge branch 'master' into feature/iceberg_source
cccs-eric May 16, 2022
c4c8cdb
add iceberg dependency for tests
cccs-eric May 16, 2022
7a03df6
Merge branch 'feature/iceberg_source' of github.com:CybercentreCanada…
cccs-eric May 16, 2022
ee1d0cc
add iceberg dependency for tests
cccs-eric May 17, 2022
c6aed6a
fix profiling null proportion calculation
cccs-eric May 23, 2022
0dbae34
Merge branch 'master' into feature/iceberg_source
rslanka May 26, 2022
078611c
Fix mypy issue in iceberg_profiler.py.
rslanka May 26, 2022
dfb7f26
Merge branch 'master' into feature/iceberg_source
rslanka May 26, 2022
7b2cf33
Fix the integration test due to the missing freeze_time decorator.
rslanka May 26, 2022
4be5dc1
Update golden files due to changes to avro nested type population + o…
rslanka May 26, 2022
64efb13
Fix bigquery_usage golden files.
rslanka May 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added datahub-web-react/src/images/iceberglogo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 23 additions & 0 deletions metadata-ingestion/docs/sources/iceberg/iceberg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
### Concept Mapping

<!-- This should be a manual mapping of concepts from the source to the DataHub Metadata Model -->
<!-- Authors should provide as much context as possible about how this mapping was generated, including assumptions made, known shortcuts, & any other caveats -->

This ingestion source maps the following Source System Concepts to DataHub Concepts:

<!-- Remove all unnecessary/irrelevant DataHub Concepts -->

| Source Concept | DataHub Concept | Notes |
| -- | -- | -- |
| `iceberg` | [Data Platform](docs/generated/metamodel/entities/dataPlatform.md) | |
| Table | [Dataset](docs/generated/metamodel/entities/dataset.md) | Each Iceberg table maps to a Dataset named using the parent folders. If a table is stored under `my/namespace/table`, the dataset name will be `my.namespace.table`. If a [Platform Instance](https://datahubproject.io/docs/platform-instances/) is configured, it will be used as a prefix: `<platform_instance>.my.namespace.table`. |
| [Table property](https://iceberg.apache.org/docs/latest/configuration/#table-properties) | [User (a.k.a CorpUser)](docs/generated/metamodel/entities/corpuser.md) | The value of a table property can be used as the name of a CorpUser owner. This table property name can be configured with the source option `user_ownership_property`. |
| [Table property](https://iceberg.apache.org/docs/latest/configuration/#table-properties) | CorpGroup | The value of a table property can be used as the name of a CorpGroup owner. This table property name can be configured with the source option `group_ownership_property`. |
| Table parent folders (excluding [warehouse catalog location](https://iceberg.apache.org/docs/latest/configuration/#catalog-properties)) | Container | Available in a future release |
| [Table schema](https://iceberg.apache.org/spec/#schemas-and-data-types) | SchemaField | Maps to the fields defined within the Iceberg table schema definition. |

## Troubleshooting

### [Common Issue]

[Provide description of common issues with this integration and steps to resolve]
22 changes: 22 additions & 0 deletions metadata-ingestion/docs/sources/iceberg/iceberg_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
source:
type: "iceberg"
config:
env: PROD
adls:
# Will be translated to https://{account_name}.dfs.core.windows.net
account_name: my_adls_account
# Can use sas_token or account_key
sas_token: "${SAS_TOKEN}"
# account_key: "${ACCOUNT_KEY}"
container_name: warehouse
base_path: iceberg
platform_instance: my_iceberg_catalog
table_pattern:
allow:
- marketing.*
profiling:
enabled: true

sink:
# sink configs

15 changes: 13 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_long_description():


base_requirements = {
# Compatability.
# Compatibility.
"dataclasses>=0.6; python_version < '3.7'",
# Typing extension should be >=3.10.0.2 ideally but we can't restrict due to Airflow 2.0.2 dependency conflict
"typing_extensions>=3.7.4.3 ; python_version < '3.8'",
Expand Down Expand Up @@ -146,6 +146,12 @@ def get_long_description():
"pyspark==3.0.3",
}

iceberg_common = {
# Iceberg Python SDK
"acryl-iceberg-legacy==0.0.4",
"azure-identity==1.10.0",
}

s3_base = {
*data_lake_base,
"moto[s3]",
Expand Down Expand Up @@ -195,7 +201,7 @@ def get_long_description():
"feast": {"feast==0.18.0", "flask-openid>=1.3.0"},
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common | {"sqlalchemy-hana>=0.5.0","hdbcli>=2.11.20"},
"hana": sql_common | {"sqlalchemy-hana>=0.5.0", "hdbcli>=2.11.20"},
"hive": sql_common
| {
# Acryl Data maintains a fork of PyHive
Expand All @@ -204,6 +210,7 @@ def get_long_description():
# - 0.6.12 adds support for Spark Thrift Server
"acryl-pyhive[hive]>=0.6.13"
},
"iceberg": iceberg_common,
"kafka": {*kafka_common, *kafka_protobuf},
"kafka-connect": sql_common | {"requests", "JPype1"},
"ldap": {"python-ldap>=2.4"},
Expand Down Expand Up @@ -357,6 +364,7 @@ def get_long_description():
dependency
for plugin in [
"feast",
"iceberg",
"lookml",
]
for dependency in plugins[plugin]
Expand All @@ -368,6 +376,7 @@ def get_long_description():
{
dependency
for plugin in [
"iceberg",
"lookml",
]
for dependency in plugins[plugin]
Expand Down Expand Up @@ -421,6 +430,7 @@ def get_long_description():
for plugin in [
"athena",
"feast",
"iceberg",
]
for dependency in plugins[plugin]
}
Expand Down Expand Up @@ -476,6 +486,7 @@ def get_long_description():
"starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource",
"nifi = datahub.ingestion.source.nifi:NifiSource",
"powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource",
"iceberg = datahub.ingestion.source.iceberg.iceberg:IcebergSource",
"vertica = datahub.ingestion.source.sql.vertica:VerticaSource",
"presto-on-hive = datahub.ingestion.source.sql.presto_on_hive:PrestoOnHiveSource",
"pulsar = datahub.ingestion.source.pulsar:PulsarSource",
Expand Down
72 changes: 53 additions & 19 deletions metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import Any, Callable, Dict, Generator, List, Optional, Union
from typing import Any, Callable, Dict, Generator, List, Optional, Type, Union

import avro.schema

Expand Down Expand Up @@ -70,7 +70,7 @@ class AvroToMceSchemaConverter:
# FieldPath format version.
version_string: str = "[version=2.0]"

field_type_mapping: Dict[str, Any] = {
field_type_mapping: Dict[str, Type] = {
AVRO_TYPE_NULL: NullTypeClass,
"bool": BooleanTypeClass,
"boolean": BooleanTypeClass,
Expand All @@ -88,7 +88,7 @@ class AvroToMceSchemaConverter:
"fixed": FixedTypeClass,
}

field_logical_type_mapping: Dict[str, Any] = {
field_logical_type_mapping: Dict[str, Type] = {
"date": DateTypeClass,
"decimal": NumberTypeClass,
"time-micros": TimeTypeClass,
Expand Down Expand Up @@ -128,17 +128,50 @@ def __init__(self, is_key_schema: bool, default_nullable: bool = False) -> None:
avro.schema.LogicalSchema: self._gen_non_nested_to_mce_fields,
}

@staticmethod
def _get_type_name(
avro_schema: avro.schema.Schema, logical_if_present: bool = False
) -> str:
logical_type_name: Optional[str] = None
if logical_if_present:
logical_type_name = getattr(
avro_schema, "logical_type", None
) or avro_schema.props.get("logicalType")
return logical_type_name or str(
getattr(avro_schema.type, "type", avro_schema.type)
)

@staticmethod
def _get_column_type(
self, field_type: Union[str, dict], logical_type: Optional[str]
avro_schema: avro.schema.Schema, logical_type: Optional[str]
) -> SchemaFieldDataType:
tp = field_type
if hasattr(tp, "type"):
tp = tp.type # type: ignore
tp = str(tp)
TypeClass: Any = self.field_type_mapping.get(tp)
type_name: str = AvroToMceSchemaConverter._get_type_name(avro_schema)
TypeClass: Optional[Type] = AvroToMceSchemaConverter.field_type_mapping.get(
type_name
)
if logical_type is not None:
TypeClass = self.field_logical_type_mapping.get(logical_type, TypeClass)
TypeClass = AvroToMceSchemaConverter.field_logical_type_mapping.get(
logical_type, TypeClass
)
assert TypeClass is not None
dt = SchemaFieldDataType(type=TypeClass())
# Handle Arrays and Maps
if isinstance(dt.type, ArrayTypeClass) and isinstance(
avro_schema, avro.schema.ArraySchema
):
dt.type.nestedType = [
AvroToMceSchemaConverter._get_type_name(
avro_schema.items, logical_if_present=True
)
]
elif isinstance(dt.type, MapTypeClass) and isinstance(
avro_schema, avro.schema.MapSchema
):
# Avro map's key is always a string. See: https://avro.apache.org/docs/current/spec.html#Maps
dt.type.keyType = "string"
dt.type.valueType = AvroToMceSchemaConverter._get_type_name(
avro_schema.values, logical_if_present=True
)
return dt

def _is_nullable(self, schema: avro.schema.Schema) -> bool:
Expand Down Expand Up @@ -282,20 +315,21 @@ def emit(self) -> Generator[SchemaField, None, None]:
tags=[TagAssociationClass(tag="urn:li:tag:Deprecated")]
)

logical_type_name: Optional[str] = (
# logicalType nested inside type
getattr(actual_schema, "logical_type", None)
or actual_schema.props.get("logicalType")
# bare logicalType
or self._actual_schema.props.get("logicalType")
)

field = SchemaField(
fieldPath=field_path,
# Populate it with the simple native type for now.
nativeDataType=native_data_type,
type=self._converter._get_column_type(
actual_schema.type,
(
getattr(
actual_schema, "logical_type", None
) # logicalType nested inside type
or self._actual_schema.props.get(
"logicalType"
) # bare logicalType
),
actual_schema,
logical_type_name,
),
description=description,
recursive=False,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from typing import Dict, Optional, Union

from azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient
from pydantic import Field, root_validator

from datahub.configuration import ConfigModel
from datahub.configuration.common import ConfigurationError


class AdlsSourceConfig(ConfigModel):
"""
Common Azure credentials config.

https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python
"""

base_path: str = Field(
default="/",
description="Base folder in hierarchical namespaces to start from.",
)
container_name: str = Field(
description="Azure storage account container name.",
)
account_name: str = Field(
description="Name of the Azure storage account. See [Microsoft official documentation on how to create a storage account.](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account)",
)
account_key: Optional[str] = Field(
description="Azure storage account access key that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
)
sas_token: Optional[str] = Field(
description="Azure storage account Shared Access Signature (SAS) token that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
)
client_secret: Optional[str] = Field(
description="Azure client secret that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
)
client_id: Optional[str] = Field(
description="Azure client (Application) ID required when a `client_secret` is used as a credential.",
)
tenant_id: Optional[str] = Field(
description="Azure tenant (Directory) ID required when a `client_secret` is used as a credential.",
)

def get_abfss_url(self, folder_path: str = "") -> str:
if not folder_path.startswith("/"):
folder_path = f"/{folder_path}"
return f"abfss://{self.container_name}@{self.account_name}.dfs.core.windows.net{folder_path}"

def get_filesystem_client(self) -> FileSystemClient:
return self.get_service_client().get_file_system_client(self.container_name)

def get_service_client(self) -> DataLakeServiceClient:
return DataLakeServiceClient(
account_url=f"https://{self.account_name}.dfs.core.windows.net",
credential=self.get_credentials(),
)

def get_credentials(
self,
) -> Union[Optional[str], ClientSecretCredential]:
if self.client_id and self.client_secret and self.tenant_id:
return ClientSecretCredential(
tenant_id=self.tenant_id,
client_id=self.client_id,
client_secret=self.client_secret,
)
return self.sas_token if self.sas_token is not None else self.account_key

@root_validator()
def _check_credential_values(cls, values: Dict) -> Dict:
if (
values.get("account_key")
or values.get("sas_token")
or (
values.get("client_id")
and values.get("client_secret")
and values.get("tenant_id")
)
):
return values
raise ConfigurationError(
"credentials missing, requires one combination of account_key or sas_token or (client_id and client_secret and tenant_id)"
)
Empty file.
Loading