Skip to content

Commit

Permalink
Splittin abs utils into multiple files to not include abs specific in…
Browse files Browse the repository at this point in the history
…cludes which can brake other sources
  • Loading branch information
treff7es committed Jul 19, 2024
1 parent 452b94f commit 92f0daa
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 67 deletions.
6 changes: 4 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/abs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.abs.report import DataLakeSourceReport
from datahub.ingestion.source.azure.abs_util import (
from datahub.ingestion.source.azure.abs_folder_utils import (
get_abs_properties,
get_abs_tags,
list_folders,
)
from datahub.ingestion.source.azure.abs_utils import (
get_container_name,
get_container_relative_path,
get_key_prefix,
list_folders,
strip_abs_prefix,
)
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import logging
import os
import re
from typing import Dict, Iterable, List, Optional

from azure.storage.blob import BlobProperties
Expand All @@ -10,67 +8,10 @@
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass

ABS_PREFIXES_REGEX = re.compile(
r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)"
)

logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)


def is_abs_uri(uri: str) -> bool:
return bool(ABS_PREFIXES_REGEX.match(uri))


def get_abs_prefix(abs_uri: str) -> Optional[str]:
result = re.search(ABS_PREFIXES_REGEX, abs_uri)
if result and result.groups():
return result.group(1)
return None


def strip_abs_prefix(abs_uri: str) -> str:
# remove abs prefix https://<storage-account>.blob.core.windows.net
abs_prefix = get_abs_prefix(abs_uri)
if not abs_prefix:
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
length_abs_prefix = len(abs_prefix)
return abs_uri[length_abs_prefix:]


def make_abs_urn(abs_uri: str, env: str) -> str:
abs_name = strip_abs_prefix(abs_uri)

if abs_name.endswith("/"):
abs_name = abs_name[:-1]

name, extension = os.path.splitext(abs_name)

if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})"


def get_container_name(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/")[0]


def get_key_prefix(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1]


def get_abs_properties(
container_name: str,
blob_name: Optional[str],
Expand Down Expand Up @@ -280,7 +221,3 @@ def list_folders(
this_dict[folder_name] = folder_name

yield f"{folder_name}"


def get_container_relative_path(abs_uri: str) -> str:
return "/".join(strip_abs_prefix(abs_uri).split("/")[1:])
66 changes: 66 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/azure/abs_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import re
from typing import Optional

# This file should not import any abs spectific modules as we import it in path_spec.py in datat_lake_common.py

ABS_PREFIXES_REGEX = re.compile(
r"(http[s]?://[a-z0-9]{3,24}\.blob\.core\.windows\.net/)"
)


def is_abs_uri(uri: str) -> bool:
return bool(ABS_PREFIXES_REGEX.match(uri))


def get_abs_prefix(abs_uri: str) -> Optional[str]:
result = re.search(ABS_PREFIXES_REGEX, abs_uri)
if result and result.groups():
return result.group(1)
return None


def strip_abs_prefix(abs_uri: str) -> str:
# remove abs prefix https://<storage-account>.blob.core.windows.net
abs_prefix = get_abs_prefix(abs_uri)
if not abs_prefix:
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
length_abs_prefix = len(abs_prefix)
return abs_uri[length_abs_prefix:]


def make_abs_urn(abs_uri: str, env: str) -> str:
abs_name = strip_abs_prefix(abs_uri)

if abs_name.endswith("/"):
abs_name = abs_name[:-1]

name, extension = os.path.splitext(abs_name)

if extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:abs,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:abs,{abs_name},{env})"


def get_container_name(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/")[0]


def get_key_prefix(abs_uri: str) -> str:
if not is_abs_uri(abs_uri):
raise ValueError(
f"Not an Azure Blob Storage URI. Must match the following regular expression: {str(ABS_PREFIXES_REGEX)}"
)
return strip_abs_prefix(abs_uri).split("/", maxsplit=1)[1]


def get_container_relative_path(abs_uri: str) -> str:
return "/".join(strip_abs_prefix(abs_uri).split("/")[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
get_s3_prefix,
is_s3_uri,
)
from datahub.ingestion.source.azure.abs_util import (
from datahub.ingestion.source.azure.abs_utils import (
get_abs_prefix,
get_container_name,
get_container_relative_path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from datahub.configuration.common import ConfigModel
from datahub.ingestion.source.aws.s3_util import is_s3_uri
from datahub.ingestion.source.azure.abs_util import is_abs_uri
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
from datahub.ingestion.source.gcs.gcs_utils import is_gcs_uri

# hide annoying debug errors from py4j
Expand Down

0 comments on commit 92f0daa

Please sign in to comment.