Skip to content

Commit

Permalink
ref: use make_dataset_urn
Browse files Browse the repository at this point in the history
  • Loading branch information
rinzool committed Jan 19, 2024
1 parent 6ec5702 commit 5bfd976
Showing 1 changed file with 7 additions and 31 deletions.
38 changes: 7 additions & 31 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import logging
import re
from functools import lru_cache
from typing import Dict, Iterable, List, Optional

Expand All @@ -10,7 +9,7 @@
from pydantic.fields import Field

from datahub.configuration import ConfigModel
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.emitter.mce_builder import DEFAULT_ENV, make_dataset_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -72,8 +71,6 @@
"box_plot": ChartTypeClass.BAR,
}

RESERVED_CHARACTERS = set([","])


class SupersetConfig(StatefulIngestionConfigBase, ConfigModel):
# See the Superset /security/login endpoint for details
Expand Down Expand Up @@ -106,11 +103,6 @@ class SupersetConfig(StatefulIngestionConfigBase, ConfigModel):
description="Can be used to change mapping for database names in superset to what you have in datahub",
)

reserved_characters_replacement: str = Field(
default=None,
description=f"Can be used to replace reserved characters {RESERVED_CHARACTERS} in tables name so they can be ingested",
)

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
Expand Down Expand Up @@ -224,36 +216,20 @@ def get_datasource_urn_from_id(self, datasource_id):
).json()
schema_name = dataset_response.get("result", {}).get("schema")
table_name = dataset_response.get("result", {}).get("table_name")
if table_name and RESERVED_CHARACTERS.intersection(table_name):
if not self.config.reserved_characters_replacement:
logger.warning(
f"Failed to ingest following dataset due to a reserved character {RESERVED_CHARACTERS} in table name: {schema_name}.{table_name}. "
"Consider using 'reserved_characters_replacement' argument to ingest dataset"
)
return None

table_name = re.sub(
rf"[{''.join(RESERVED_CHARACTERS)}]",
self.config.reserved_characters_replacement,
table_name,
)

database_id = dataset_response.get("result", {}).get("database", {}).get("id")
database_name = (
dataset_response.get("result", {}).get("database", {}).get("database_name")
)
database_name = self.config.database_alias.get(database_name, database_name)

if database_id and table_name:
platform = self.get_platform_from_database_id(database_id)
platform_urn = f"urn:li:dataPlatform:{platform}"
dataset_urn = (
f"urn:li:dataset:("
f"{platform_urn},{database_name + '.' if database_name else ''}"
f"{schema_name + '.' if schema_name else ''}"
f"{table_name},{self.config.env})"
return make_dataset_urn(
platform=self.get_platform_from_database_id(database_id),
name=".".join(
filter(lambda _: _, [schema_name, database_name, table_name])
),
env=self.config.env,
)
return dataset_urn
return None

def construct_dashboard_from_api_data(self, dashboard_data):
Expand Down

0 comments on commit 5bfd976

Please sign in to comment.