Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/sigma): Fix multiple requests http errors #10616

Merged
32 changes: 27 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sigma/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Dict, Optional

import pydantic
Expand All @@ -26,7 +26,10 @@ class Constant:
"""

# Rest API response key constants
REFRESH_TOKEN = "refresh_token"
ACCESS_TOKEN = "access_token"
ENTRIES = "entries"
MEMBERID = "memberId"
FIRSTNAME = "firstName"
LASTNAME = "lastName"
EDGES = "edges"
Expand All @@ -52,6 +55,11 @@ class Constant:
@dataclass
class SigmaSourceReport(StaleEntityRemovalSourceReport):
number_of_workspaces: int = 0
non_accessible_workspaces_count: int = 0
shared_entities_count: int = 0
number_of_datasets: int = 0
number_of_workbooks: int = 0
number_of_files_metadata: Dict[str, int] = field(default_factory=dict)

def report_number_of_workspaces(self, number_of_workspaces: int) -> None:
self.number_of_workspaces = number_of_workspaces
Expand All @@ -75,15 +83,29 @@ class SigmaSourceConfig(
workspace_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns to filter Sigma workspaces in ingestion."
"Mention 'User Folder' if entities of 'My documents' need to ingest.",
"Mention 'My documents' if personal entities also need to ingest.",
)
ingest_owner: Optional[bool] = pydantic.Field(
default=True,
description="Ingest Owner from source. This will override Owner info entered from UI",
description="Ingest Owner from source. This will override Owner info entered from UI.",
)
ingest_shared_entities: Optional[bool] = pydantic.Field(
default=False,
description="Whether to ingest the shared entities or not.",
)
extract_lineage: Optional[bool] = pydantic.Field(
default=True,
description="Whether to extract lineage of workbook's elements and datasets or not.",
)
workbook_lineage_pattern: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns to filter workbook's elements and datasets lineage in ingestion."
"Requires extract_lineage to be enabled.",
)
chart_sources_platform_mapping: Dict[str, PlatformDetail] = pydantic.Field(
default={},
description="A mapping of the sigma workspace/workbook/chart folder path to all chart's data sources platform details present inside that folder path.",
)

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Sigma Stateful Ingestion Config."
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,56 @@ class Workspace(BaseModel):
createdAt: datetime
updatedAt: datetime

@root_validator(pre=True)
def update_values(cls, values: Dict) -> Dict:
# Update name if presonal workspace
if values["name"] == "User Folder":
values["name"] = "My documents"
return values


class SigmaDataset(BaseModel):
datasetId: str
workspaceId: str
name: str
description: str
createdBy: str
createdAt: datetime
updatedAt: datetime
url: str
path: str
workspaceId: Optional[str] = None
path: Optional[str] = None
badge: Optional[str] = None

@root_validator(pre=True)
def update_values(cls, values: Dict) -> Dict:
def get_urn_part(self):
# As element lineage api provide this id as source dataset id
values["datasetId"] = values["url"].split("/")[-1]
return values
return self.url.split("/")[-1]


class Element(BaseModel):
elementId: str
type: str
name: str
url: str
type: Optional[str] = None
vizualizationType: Optional[str] = None
query: Optional[str] = None
columns: List[str] = []
upstream_sources: Dict[str, str] = {}

def get_urn_part(self):
return self.elementId


class Page(BaseModel):
pageId: str
name: str
elements: List[Element] = []

def get_urn_part(self):
return self.pageId


class Workbook(BaseModel):
workbookId: str
workspaceId: str
name: str
createdBy: str
updatedBy: str
Expand All @@ -69,5 +79,16 @@ class Workbook(BaseModel):
url: str
path: str
latestVersion: int
workspaceId: Optional[str] = None
pages: List[Page] = []
badge: Optional[str] = None


class File(BaseModel):
id: str
name: str
parentId: str
path: str
type: str
badge: Optional[str] = None
workspaceId: Optional[str] = None
85 changes: 45 additions & 40 deletions metadata-ingestion/src/datahub/ingestion/source/sigma/sigma.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,18 @@ def __init__(self, config: SigmaSourceConfig, ctx: PipelineContext):
self.reporter = SigmaSourceReport()
self.dataset_upstream_urn_mapping: Dict[str, List[str]] = {}
try:
self.sigma_api = SigmaAPI(self.config)
self.sigma_api = SigmaAPI(self.config, self.reporter)
except Exception as e:
raise ConfigurationError(f"Unable to connect sigma API. Exception: {e}")

@staticmethod
def test_connection(config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
try:
SigmaAPI(SigmaSourceConfig.parse_obj_allow_extras(config_dict))
SigmaAPI(
SigmaSourceConfig.parse_obj_allow_extras(config_dict),
SigmaSourceReport(),
)
test_report.basic_connectivity = CapabilityReport(capable=True)
except Exception as e:
test_report.basic_connectivity = CapabilityReport(
Expand Down Expand Up @@ -175,9 +178,6 @@ def _gen_workspace_workunit(
last_modified=int(workspace.updatedAt.timestamp() * 1000),
)

def _get_sigma_dataset_identifier(self, dataset: SigmaDataset) -> str:
return dataset.datasetId

def _gen_sigma_dataset_urn(self, dataset_identifier: str) -> str:
return builder.make_dataset_urn_with_platform_instance(
name=dataset_identifier,
Expand All @@ -201,9 +201,11 @@ def _gen_dataset_properties(
externalUrl=dataset.url,
created=TimeStamp(time=int(dataset.createdAt.timestamp() * 1000)),
lastModified=TimeStamp(time=int(dataset.updatedAt.timestamp() * 1000)),
customProperties={"datasetId": dataset.datasetId},
tags=[dataset.badge] if dataset.badge else None,
)
dataset_properties.customProperties.update({"path": dataset.path})
if dataset.path:
dataset_properties.customProperties["path"] = dataset.path
return MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=dataset_properties
).as_workunit()
Expand Down Expand Up @@ -262,18 +264,18 @@ def _gen_entity_browsepath_aspect(
def _gen_dataset_workunit(
self, dataset: SigmaDataset
) -> Iterable[MetadataWorkUnit]:
dataset_identifier = self._get_sigma_dataset_identifier(dataset)
dataset_urn = self._gen_sigma_dataset_urn(dataset_identifier)
dataset_urn = self._gen_sigma_dataset_urn(dataset.get_urn_part())

yield self._gen_entity_status_aspect(dataset_urn)

yield self._gen_dataset_properties(dataset_urn, dataset)

yield from add_entity_to_container(
container_key=self._gen_workspace_key(dataset.workspaceId),
entity_type="dataset",
entity_urn=dataset_urn,
)
if dataset.workspaceId:
yield from add_entity_to_container(
container_key=self._gen_workspace_key(dataset.workspaceId),
entity_type="dataset",
entity_urn=dataset_urn,
)

dpi_aspect = self._gen_dataplatform_instance_aspect(dataset_urn)
if dpi_aspect:
Expand All @@ -288,15 +290,16 @@ def _gen_dataset_workunit(
aspect=SubTypes(typeNames=[DatasetSubTypes.SIGMA_DATASET]),
).as_workunit()

paths = dataset.path.split("/")[1:]
if len(paths) > 0:
yield self._gen_entity_browsepath_aspect(
entity_urn=dataset_urn,
parent_entity_urn=builder.make_container_urn(
self._gen_workspace_key(dataset.workspaceId)
),
paths=paths,
)
if dataset.path and dataset.workspaceId:
paths = dataset.path.split("/")[1:]
if len(paths) > 0:
yield self._gen_entity_browsepath_aspect(
entity_urn=dataset_urn,
parent_entity_urn=builder.make_container_urn(
self._gen_workspace_key(dataset.workspaceId)
),
paths=paths,
)

if dataset.badge:
yield MetadataChangeProposalWrapper(
Expand All @@ -322,15 +325,15 @@ def _gen_dashboard_urn(self, dashboard_identifier: str) -> str:
)

def _gen_dashboard_info_workunit(self, page: Page) -> MetadataWorkUnit:
dashboard_urn = self._gen_dashboard_urn(page.pageId)
dashboard_urn = self._gen_dashboard_urn(page.get_urn_part())
dashboard_info_cls = DashboardInfoClass(
title=page.name,
description="",
charts=[
builder.make_chart_urn(
platform=self.platform,
platform_instance=self.config.platform_instance,
name=element.elementId,
name=element.get_urn_part(),
)
for element in page.elements
],
Expand Down Expand Up @@ -424,12 +427,12 @@ def _gen_elements_workunit(
chart_urn = builder.make_chart_urn(
platform=self.platform,
platform_instance=self.config.platform_instance,
name=element.elementId,
name=element.get_urn_part(),
)

custom_properties = {
"VizualizationType": str(element.vizualizationType),
"type": str(element.type),
"type": str(element.type) if element.type else "Unknown",
}

yield self._gen_entity_status_aspect(chart_urn)
Expand Down Expand Up @@ -490,7 +493,7 @@ def _gen_pages_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUnit]:
Map Sigma workbook page to Datahub dashboard
"""
for page in workbook.pages:
dashboard_urn = self._gen_dashboard_urn(page.pageId)
dashboard_urn = self._gen_dashboard_urn(page.get_urn_part())

yield self._gen_entity_status_aspect(dashboard_urn)

Expand All @@ -513,7 +516,7 @@ def _gen_pages_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUnit]:
)

yield MetadataChangeProposalWrapper(
entityUrn=self._gen_dashboard_urn(page.pageId),
entityUrn=dashboard_urn,
aspect=InputFieldsClass(fields=all_input_fields),
).as_workunit()

Expand All @@ -522,11 +525,14 @@ def _gen_workbook_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUni
Map Sigma Workbook to Datahub container
"""
owner_username = self.sigma_api.get_user_name(workbook.createdBy)
workbook_key = self._gen_workbook_key(workbook.workbookId)
yield from gen_containers(
container_key=self._gen_workbook_key(workbook.workbookId),
container_key=workbook_key,
name=workbook.name,
sub_types=[BIContainerSubTypes.SIGMA_WORKBOOK],
parent_container_key=self._gen_workspace_key(workbook.workspaceId),
parent_container_key=self._gen_workspace_key(workbook.workspaceId)
if workbook.workspaceId
else None,
extra_properties={
"path": workbook.path,
"latestVersion": str(workbook.latestVersion),
Expand All @@ -541,11 +547,9 @@ def _gen_workbook_workunit(self, workbook: Workbook) -> Iterable[MetadataWorkUni
)

paths = workbook.path.split("/")[1:]
if len(paths) > 0:
if len(paths) > 0 and workbook.workspaceId:
yield self._gen_entity_browsepath_aspect(
entity_urn=builder.make_container_urn(
self._gen_workbook_key(workbook.workbookId),
),
entity_urn=builder.make_container_urn(workbook_key),
parent_entity_urn=builder.make_container_urn(
self._gen_workspace_key(workbook.workspaceId)
),
Expand Down Expand Up @@ -578,12 +582,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Datahub Ingestion framework invoke this method
"""
logger.info("Sigma plugin execution is started")
entities = self.sigma_api.get_sigma_entities()
for entity in entities:
if isinstance(entity, Workbook):
yield from self._gen_workbook_workunit(entity)
elif isinstance(entity, SigmaDataset):
yield from self._gen_dataset_workunit(entity)
self.sigma_api.fill_workspaces()

for dataset in self.sigma_api.get_sigma_datasets():
yield from self._gen_dataset_workunit(dataset)
for workbook in self.sigma_api.get_sigma_workbooks():
yield from self._gen_workbook_workunit(workbook)

for workspace in self._get_allowed_workspaces():
yield from self._gen_workspace_workunit(workspace)
yield from self._gen_sigma_dataset_upstream_lineage_workunit()
Expand Down
Loading
Loading