diff --git a/README.md b/README.md index 15289f663f7a8..b3c2e2d545941 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,7 @@ Here are the companies that have officially adopted DataHub. Please feel free to - [Cabify](https://cabify.tech/) - [ClassDojo](https://www.classdojo.com/) - [Coursera](https://www.coursera.org/) +- [CVS Health](https://www.cvshealth.com/) - [DefinedCrowd](http://www.definedcrowd.com) - [DFDS](https://www.dfds.com/) - [Digital Turbine](https://www.digitalturbine.com/) diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapper.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapper.java index ffb14df5e800b..26835f9e57dcd 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapper.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/entitytype/EntityTypeMapper.java @@ -48,6 +48,7 @@ public class EntityTypeMapper { .put(EntityType.BUSINESS_ATTRIBUTE, Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME) .put(EntityType.QUERY, Constants.QUERY_ENTITY_NAME) .put(EntityType.POST, Constants.POST_ENTITY_NAME) + .put(EntityType.FORM, Constants.FORM_ENTITY_NAME) .build(); private static final Map ENTITY_NAME_TO_TYPE = diff --git a/datahub-web-react/src/Mocks.tsx b/datahub-web-react/src/Mocks.tsx index de471b6b9f2fb..aed672a34e7ca 100644 --- a/datahub-web-react/src/Mocks.tsx +++ b/datahub-web-react/src/Mocks.tsx @@ -1254,6 +1254,7 @@ export const glossaryNode5 = { export const sampleTag = { urn: 'urn:li:tag:abc-sample-tag', + type: EntityType.Tag, name: 'abc-sample-tag', description: 'sample tag description', ownership: { diff --git a/datahub-web-react/src/app/ingest/source/utils.ts b/datahub-web-react/src/app/ingest/source/utils.ts index 7e00522ffaf47..44630b4fca5c2 100644 --- a/datahub-web-react/src/app/ingest/source/utils.ts +++ b/datahub-web-react/src/app/ingest/source/utils.ts @@ -47,6 +47,7 @@ export const WARNING = 'WARNING'; export const FAILURE = 'FAILURE'; export const CONNECTION_FAILURE = 'CONNECTION_FAILURE'; export const CANCELLED = 'CANCELLED'; +export const ABORTED = 'ABORTED'; export const UP_FOR_RETRY = 'UP_FOR_RETRY'; export const ROLLING_BACK = 'ROLLING_BACK'; export const ROLLED_BACK = 'ROLLED_BACK'; @@ -68,6 +69,7 @@ export const getExecutionRequestStatusIcon = (status: string) => { (status === ROLLED_BACK && WarningOutlined) || (status === ROLLING_BACK && LoadingOutlined) || (status === ROLLBACK_FAILED && CloseCircleOutlined) || + (status === ABORTED && CloseCircleOutlined) || ClockCircleOutlined ); }; @@ -83,6 +85,7 @@ export const getExecutionRequestStatusDisplayText = (status: string) => { (status === ROLLED_BACK && 'Rolled Back') || (status === ROLLING_BACK && 'Rolling Back') || (status === ROLLBACK_FAILED && 'Rollback Failed') || + (status === ABORTED && 'Aborted') || status ); }; @@ -105,6 +108,8 @@ export const getExecutionRequestSummaryText = (status: string) => { return 'Ingestion is in the process of rolling back.'; case ROLLBACK_FAILED: return 'Ingestion rollback failed.'; + case ABORTED: + return 'Ingestion job got aborted due to worker restart.'; default: return 'Ingestion status not recognized.'; } @@ -121,6 +126,7 @@ export const getExecutionRequestStatusDisplayColor = (status: string) => { (status === ROLLED_BACK && 'orange') || (status === ROLLING_BACK && 'orange') || (status === ROLLBACK_FAILED && 'red') || + (status === ABORTED && 'red') || ANTD_GRAY[7] ); }; diff --git a/datahub-web-react/src/graphql/container.graphql b/datahub-web-react/src/graphql/container.graphql index 749c1c9172b6d..4b3ecfe8aaaff 100644 --- a/datahub-web-react/src/graphql/container.graphql +++ b/datahub-web-react/src/graphql/container.graphql @@ -1,6 +1,7 @@ query getContainer($urn: String!) { container(urn: $urn) { urn + type exists lastIngested platform { diff --git a/datahub-web-react/src/graphql/dashboard.graphql b/datahub-web-react/src/graphql/dashboard.graphql index 68a966a68e00a..681c98f361ccb 100644 --- a/datahub-web-react/src/graphql/dashboard.graphql +++ b/datahub-web-react/src/graphql/dashboard.graphql @@ -1,5 +1,7 @@ query getDashboard($urn: String!) { dashboard(urn: $urn) { + urn + type ...dashboardFields privileges { ...entityPrivileges diff --git a/datahub-web-react/src/graphql/dataJob.graphql b/datahub-web-react/src/graphql/dataJob.graphql index 78247bd460fbb..836aac35deaf5 100644 --- a/datahub-web-react/src/graphql/dataJob.graphql +++ b/datahub-web-react/src/graphql/dataJob.graphql @@ -1,5 +1,7 @@ query getDataJob($urn: String!) { dataJob(urn: $urn) { + urn + type ...dataJobFields privileges { ...entityPrivileges diff --git a/datahub-web-react/src/graphql/dataPlatform.graphql b/datahub-web-react/src/graphql/dataPlatform.graphql index 6281cf155a5d2..44acbf6737fae 100644 --- a/datahub-web-react/src/graphql/dataPlatform.graphql +++ b/datahub-web-react/src/graphql/dataPlatform.graphql @@ -1,5 +1,7 @@ query getDataPlatform($urn: String!) { dataPlatform(urn: $urn) { + urn + type ...platformFields } } diff --git a/datahub-web-react/src/graphql/dataProduct.graphql b/datahub-web-react/src/graphql/dataProduct.graphql index eb053ca956131..623ece13dbfc1 100644 --- a/datahub-web-react/src/graphql/dataProduct.graphql +++ b/datahub-web-react/src/graphql/dataProduct.graphql @@ -1,5 +1,7 @@ query getDataProduct($urn: String!) { dataProduct(urn: $urn) { + urn + type ...dataProductFields privileges { ...entityPrivileges diff --git a/datahub-web-react/src/graphql/dataset.graphql b/datahub-web-react/src/graphql/dataset.graphql index 1ca25a6ba3bf6..fcca919f61423 100644 --- a/datahub-web-react/src/graphql/dataset.graphql +++ b/datahub-web-react/src/graphql/dataset.graphql @@ -1,6 +1,7 @@ query getDataProfiles($urn: String!, $limit: Int, $startTime: Long, $endTime: Long) { dataset(urn: $urn) { urn + type datasetProfiles(limit: $limit, startTimeMillis: $startTime, endTimeMillis: $endTime) { rowCount columnCount diff --git a/datahub-web-react/src/graphql/mlFeature.graphql b/datahub-web-react/src/graphql/mlFeature.graphql index d6a75e16b86f1..2ed5ecfb37fda 100644 --- a/datahub-web-react/src/graphql/mlFeature.graphql +++ b/datahub-web-react/src/graphql/mlFeature.graphql @@ -1,5 +1,7 @@ query getMLFeature($urn: String!) { mlFeature(urn: $urn) { + urn + type ...nonRecursiveMLFeature privileges { ...entityPrivileges diff --git a/datahub-web-react/src/graphql/mlFeatureTable.graphql b/datahub-web-react/src/graphql/mlFeatureTable.graphql index a6e069c120518..02efbaf9766e1 100644 --- a/datahub-web-react/src/graphql/mlFeatureTable.graphql +++ b/datahub-web-react/src/graphql/mlFeatureTable.graphql @@ -1,5 +1,7 @@ query getMLFeatureTable($urn: String!) { mlFeatureTable(urn: $urn) { + urn + type ...nonRecursiveMLFeatureTable privileges { ...entityPrivileges diff --git a/datahub-web-react/src/graphql/mlModel.graphql b/datahub-web-react/src/graphql/mlModel.graphql index 1626bc473213a..2192888caef70 100644 --- a/datahub-web-react/src/graphql/mlModel.graphql +++ b/datahub-web-react/src/graphql/mlModel.graphql @@ -1,5 +1,7 @@ query getMLModel($urn: String!) { mlModel(urn: $urn) { + urn + type ...nonRecursiveMLModel features: relationships(input: { types: ["Consumes"], direction: OUTGOING, start: 0, count: 100 }) { start diff --git a/datahub-web-react/src/graphql/mlModelGroup.graphql b/datahub-web-react/src/graphql/mlModelGroup.graphql index 8ae049c8c0b1d..81ab65d0b9a08 100644 --- a/datahub-web-react/src/graphql/mlModelGroup.graphql +++ b/datahub-web-react/src/graphql/mlModelGroup.graphql @@ -1,5 +1,7 @@ query getMLModelGroup($urn: String!) { mlModelGroup(urn: $urn) { + urn + type ...nonRecursiveMLModelGroupFields incoming: relationships( input: { diff --git a/datahub-web-react/src/graphql/mlPrimaryKey.graphql b/datahub-web-react/src/graphql/mlPrimaryKey.graphql index 599c4d7fabcac..d39f9d3fbdfa2 100644 --- a/datahub-web-react/src/graphql/mlPrimaryKey.graphql +++ b/datahub-web-react/src/graphql/mlPrimaryKey.graphql @@ -1,5 +1,7 @@ query getMLPrimaryKey($urn: String!) { mlPrimaryKey(urn: $urn) { + urn + type ...nonRecursiveMLPrimaryKey privileges { ...entityPrivileges diff --git a/datahub-web-react/src/graphql/tag.graphql b/datahub-web-react/src/graphql/tag.graphql index 031d923276bfe..0bf0953b15fbe 100644 --- a/datahub-web-react/src/graphql/tag.graphql +++ b/datahub-web-react/src/graphql/tag.graphql @@ -1,6 +1,7 @@ query getTag($urn: String!) { tag(urn: $urn) { urn + type name description properties { diff --git a/datahub-web-react/src/graphql/user.graphql b/datahub-web-react/src/graphql/user.graphql index a8a4e90284956..030ef85df7124 100644 --- a/datahub-web-react/src/graphql/user.graphql +++ b/datahub-web-react/src/graphql/user.graphql @@ -1,6 +1,7 @@ query getUser($urn: String!, $groupsCount: Int!) { corpUser(urn: $urn) { urn + type username isNativeUser exists diff --git a/docs/managed-datahub/managed-datahub-overview.md b/docs/managed-datahub/managed-datahub-overview.md index 867b03501e0e0..087238097dd9f 100644 --- a/docs/managed-datahub/managed-datahub-overview.md +++ b/docs/managed-datahub/managed-datahub-overview.md @@ -51,10 +51,17 @@ know. | Feature | DataHub | Acryl DataHub | | ---------------------------------------------- | ------- | ------------- | | Surface data quality results | ✅ | ✅ | -| Build and enforce continuous data SLAs | ❌ | ✅ | -| Continuous monitoring of dataset health | ❌ | ✅ | -| Data observability alerts and notifications | ❌ | ✅ | -| Data Incident management | ❌ | ✅ | +| Create data contracts | ✅ | ✅ | +| Raise and Resolve Data Incidents | ✅ | ✅ | +| Monitor Freshness SLAs | ❌ | ✅ | +| Monitor Table Schemas | ❌ | ✅ | +| Monitor Table Volume | ❌ | ✅ | +| Validate Table Columns | ❌ | ✅ | +| Receive Notifications via Email & Slack | ❌ | ✅ | +| Manage Data Incidents via Slack | ❌ | ✅ | +| View Data Health Dashboard | ❌ | ✅ | +| Evaluate data quality checks on-demand (API + UI) | ❌ | ✅ | +| Evaluate data quality checks in your VPC | ❌ | ✅ | ## Enterprise Grade diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index b8db746a63fdb..41c04ca4a433c 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -345,6 +345,7 @@ "flask-openid>=1.3.0", "dask[dataframe]<2024.7.0", }, + "grafana": {"requests"}, "glue": aws_common, # hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported "hana": sql_common @@ -635,6 +636,7 @@ "dynamodb = datahub.ingestion.source.dynamodb.dynamodb:DynamoDBSource", "elasticsearch = datahub.ingestion.source.elastic_search:ElasticsearchSource", "feast = datahub.ingestion.source.feast:FeastRepositorySource", + "grafana = datahub.ingestion.source.grafana.grafana_source:GrafanaSource", "glue = datahub.ingestion.source.aws.glue:GlueSource", "sagemaker = datahub.ingestion.source.aws.sagemaker:SagemakerSource", "hana = datahub.ingestion.source.sql.hana:HanaSource", diff --git a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py index c71bced38f8aa..afeedb83f7998 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py +++ b/metadata-ingestion/src/datahub/api/entities/dataset/dataset.py @@ -162,6 +162,7 @@ class Dataset(BaseModel): structured_properties: Optional[ Dict[str, Union[str, float, List[Union[str, float]]]] ] = None + external_url: Optional[str] = None @property def platform_urn(self) -> str: @@ -236,6 +237,7 @@ def generate_mcp( description=self.description, name=self.name, customProperties=self.properties, + externalUrl=self.external_url, ), ) yield mcp diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 3173dfa302399..8843a0ad8eae6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1216,8 +1216,22 @@ def _generate_single_profile( except Exception as e: if not self.config.catch_exceptions: raise e - logger.exception(f"Encountered exception while profiling {pretty_name}") - self.report.report_warning(pretty_name, f"Profiling exception {e}") + + error_message = str(e).lower() + if "permission denied" in error_message: + self.report.warning( + title="Unauthorized to extract data profile statistics", + message="We were denied access while attempting to generate profiling statistics for some assets. Please ensure the provided user has permission to query these tables and views.", + context=f"Asset: {pretty_name}", + exc=e, + ) + else: + self.report.warning( + title="Failed to extract statistics for some assets", + message="Caught unexpected exception while attempting to extract profiling statistics for some assets.", + context=f"Asset: {pretty_name}", + exc=e, + ) return None finally: if batch is not None and self.base_engine.engine.name == TRINO: diff --git a/metadata-ingestion/src/datahub/ingestion/source/grafana/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/grafana/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_source.py b/metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_source.py new file mode 100644 index 0000000000000..53f71046c25c0 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_source.py @@ -0,0 +1,131 @@ +from typing import Iterable, List, Optional + +import requests +from pydantic import Field, SecretStr + +import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import PlatformInstanceConfigMixin +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import MetadataWorkUnitProcessor +from datahub.ingestion.api.source_helpers import auto_workunit +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.state.stale_entity_removal_handler import ( + StaleEntityRemovalHandler, + StaleEntityRemovalSourceReport, + StatefulIngestionConfigBase, +) +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionReport, + StatefulIngestionSourceBase, +) +from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps +from datahub.metadata.schema_classes import DashboardInfoClass, StatusClass + + +class GrafanaSourceConfig(StatefulIngestionConfigBase, PlatformInstanceConfigMixin): + url: str = Field( + default="", + description="Grafana URL in the format http://your-grafana-instance with no trailing slash", + ) + service_account_token: SecretStr = Field( + description="Service account token for Grafana" + ) + + +class GrafanaReport(StaleEntityRemovalSourceReport): + pass + + +@platform_name("Grafana") +@config_class(GrafanaSourceConfig) +@support_status(SupportStatus.TESTING) +class GrafanaSource(StatefulIngestionSourceBase): + """ + This is an experimental source for Grafana. + Currently only ingests dashboards (no charts) + """ + + def __init__(self, config: GrafanaSourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) + self.source_config = config + self.report = GrafanaReport() + self.platform = "grafana" + + @classmethod + def create(cls, config_dict, ctx): + config = GrafanaSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: + return [ + *super().get_workunit_processors(), + StaleEntityRemovalHandler.create( + self, self.source_config, self.ctx + ).workunit_processor, + ] + + def get_report(self) -> StatefulIngestionReport: + return self.report + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + headers = { + "Authorization": f"Bearer {self.source_config.service_account_token.get_secret_value()}", + "Content-Type": "application/json", + } + try: + response = requests.get( + f"{self.source_config.url}/api/search", headers=headers + ) + response.raise_for_status() + except requests.exceptions.RequestException as e: + self.report.report_failure(f"Failed to fetch dashboards: {str(e)}") + return + res_json = response.json() + for item in res_json: + uid = item["uid"] + title = item["title"] + url_path = item["url"] + full_url = f"{self.source_config.url}{url_path}" + dashboard_urn = builder.make_dashboard_urn( + platform=self.platform, + name=uid, + platform_instance=self.source_config.platform_instance, + ) + + yield from auto_workunit( + MetadataChangeProposalWrapper.construct_many( + entityUrn=dashboard_urn, + aspects=[ + DashboardInfoClass( + description="", + title=title, + charts=[], + lastModified=ChangeAuditStamps(), + externalUrl=full_url, + customProperties={ + key: str(value) + for key, value in { + "displayName": title, + "id": item["id"], + "uid": uid, + "title": title, + "uri": item["uri"], + "type": item["type"], + "folderId": item.get("folderId"), + "folderUid": item.get("folderUid"), + "folderTitle": item.get("folderTitle"), + }.items() + if value is not None + }, + ), + StatusClass(removed=False), + ], + ) + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py new file mode 100644 index 0000000000000..43ad5bfcefdf1 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/exception.py @@ -0,0 +1,65 @@ +from typing import Callable, Iterable, TypeVar, Union + +import redshift_connector +from typing_extensions import ParamSpec + +from datahub.ingestion.source.redshift.report import RedshiftReport + +T = TypeVar("T") +P = ParamSpec("P") + + +def handle_redshift_exceptions( + report: RedshiftReport, + func: Callable[P, T], + *args: P.args, + **kwargs: P.kwargs, +) -> Union[T, None]: + try: + return func(*args, **kwargs) + except redshift_connector.Error as e: + report_redshift_failure(report, e) + return None + + +def handle_redshift_exceptions_yield( + report: RedshiftReport, + func: Callable[P, Iterable[T]], + *args: P.args, + **kwargs: P.kwargs, +) -> Iterable[T]: + try: + yield from func(*args, **kwargs) + except redshift_connector.Error as e: + report_redshift_failure(report, e) + + +def report_redshift_failure( + report: RedshiftReport, e: redshift_connector.Error +) -> None: + error_message = str(e).lower() + if "permission denied" in error_message: + if "svv_table_info" in error_message: + report.report_failure( + title="Permission denied", + message="Failed to extract metadata due to insufficient permission to access 'svv_table_info' table. Please ensure the provided database user has access.", + exc=e, + ) + elif "svl_user_info" in error_message: + report.report_failure( + title="Permission denied", + message="Failed to extract metadata due to insufficient permission to access 'svl_user_info' table. Please ensure the provided database user has access.", + exc=e, + ) + else: + report.report_failure( + title="Permission denied", + message="Failed to extract metadata due to insufficient permissions.", + exc=e, + ) + else: + report.report_failure( + title="Failed to extract some metadata", + message="Failed to extract some metadata from Redshift.", + exc=e, + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py index 526e5e2cf12d0..3d6c746183fd9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py @@ -1,6 +1,5 @@ import collections import logging -import traceback from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union import redshift_connector @@ -249,8 +248,10 @@ def _populate_lineage_agg( processor(lineage_row) except Exception as e: self.report.warning( - f"lineage-v2-extract-{lineage_type.name}", - f"Error was {e}, {traceback.format_exc()}", + title="Failed to extract some lineage", + message=f"Failed to extract lineage of type {lineage_type.name}", + context=f"Query: '{query}'", + exc=e, ) self._lineage_v1.report_status(f"extract-{lineage_type.name}", False) @@ -417,3 +418,9 @@ def _process_external_tables( def generate(self) -> Iterable[MetadataWorkUnit]: for mcp in self.aggregator.gen_metadata(): yield mcp.as_workunit() + if len(self.aggregator.report.observed_query_parse_failures) > 0: + self.report.report_failure( + title="Failed to extract some SQL lineage", + message="Unexpected error(s) while attempting to extract lineage from SQL queries. See the full logs for more details.", + context=f"Query Parsing Failures: {self.aggregator.report.observed_query_parse_failures}", + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index eed82ec4d83e7..6f611fa674187 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -48,15 +48,26 @@ def get_workunits( if not self.config.schema_pattern.allowed(schema): continue for table in tables[db].get(schema, {}): - if ( - not self.config.profiling.profile_external_tables - and table.type == "EXTERNAL_TABLE" - ): - self.report.profiling_skipped_other[schema] += 1 - logger.info( - f"Skipping profiling of external table {db}.{schema}.{table.name}" - ) - continue + if table.type == "EXTERNAL_TABLE": + if not self.config.profiling.profile_external_tables: + # Case 1: If user did not tell us to profile external tables, simply log this. + self.report.profiling_skipped_other[schema] += 1 + logger.info( + f"Skipping profiling of external table {db}.{schema}.{table.name}" + ) + # Continue, since we should not profile this table. + continue + elif self.config.profiling.profile_table_level_only: + # Case 2: User DID tell us to profile external tables, but only at the table level. + # Currently, we do not support this combination. The user needs to also set + # profile_table_level_only to False in order to profile. + self.report.report_warning( + title="Skipped profiling for external tables", + message="External tables are not supported for profiling when 'profile_table_level_only' config is set to 'True'. Please set 'profile_table_level_only' to 'False' in order to profile external Redshift tables.", + context=f"External Table: {db}.{schema}.{table.name}", + ) + # Continue, since we were unable to retrieve cheap profiling stats from svv_table_info. + continue # Emit the profile work unit profile_request = self.get_profile_request(table, schema, db) if profile_request is not None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index 3bd69d72be605..affbcd00b5107 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -122,6 +122,7 @@ def list_tables( else: return f"{tables_query} UNION {external_tables_query}" + # Why is this unused. Is this a bug? list_columns: str = """ SELECT n.nspname as "schema", diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 68821662762b6..a6ffed65aaa70 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -45,6 +45,7 @@ DatasetSubTypes, ) from datahub.ingestion.source.redshift.config import RedshiftConfig +from datahub.ingestion.source.redshift.exception import handle_redshift_exceptions_yield from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor from datahub.ingestion.source.redshift.lineage_v2 import RedshiftSqlLineageV2 from datahub.ingestion.source.redshift.profile import RedshiftProfiler @@ -411,7 +412,12 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: - connection = RedshiftSource.get_redshift_connection(self.config) + connection = self._try_get_redshift_connection(self.config) + + if connection is None: + # If we failed to establish a connection, short circuit the connector. + return + database = self.config.database logger.info(f"Processing db {database}") self.report.report_ingestion_stage_start(METADATA_EXTRACTION) @@ -419,9 +425,20 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit self.db_views[database] = defaultdict() self.db_schemas.setdefault(database, {}) + # TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping + # this fallback. For now, this gets us broad coverage quickly. + yield from handle_redshift_exceptions_yield( + self.report, self._extract_metadata, connection, database + ) + + def _extract_metadata( + self, connection: redshift_connector.Connection, database: str + ) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + yield from self.gen_database_container( database=database, ) + self.cache_tables_and_views(connection, database) self.report.tables_in_mem_size[database] = humanfriendly.format_size( @@ -556,6 +573,7 @@ def process_schema( ): for table in self.db_tables[schema.database][schema.name]: table.columns = schema_columns[schema.name].get(table.name, []) + table.column_count = len(table.columns) table_wu_generator = self._process_table( table, database=database ) @@ -575,8 +593,10 @@ def process_schema( f"Table processed: {schema.database}.{schema.name}.{table.name}" ) else: - logger.info( - f"No tables in cache for {schema.database}.{schema.name}, skipping" + self.report.info( + title="No tables found in some schemas", + message="No tables found in some schemas. This may be due to insufficient privileges for the provided user.", + context=f"Schema: {schema.database}.{schema.name}", ) else: logger.info("Table processing disabled, skipping") @@ -589,6 +609,7 @@ def process_schema( ): for view in self.db_views[schema.database][schema.name]: view.columns = schema_columns[schema.name].get(view.name, []) + view.column_count = len(view.columns) yield from self._process_view( table=view, database=database, schema=schema ) @@ -603,8 +624,10 @@ def process_schema( f"Table processed: {schema.database}.{schema.name}.{view.name}" ) else: - logger.info( - f"No views in cache for {schema.database}.{schema.name}, skipping" + self.report.info( + title="No views found in some schemas", + message="No views found in some schemas. This may be due to insufficient privileges for the provided user.", + context=f"Schema: {schema.database}.{schema.name}", ) else: logger.info("View processing disabled, skipping") @@ -1088,3 +1111,43 @@ def add_config_to_report(self): self.config.start_time, self.config.end_time, ) + + def _try_get_redshift_connection( + self, + config: RedshiftConfig, + ) -> Optional[redshift_connector.Connection]: + try: + return RedshiftSource.get_redshift_connection(config) + except redshift_connector.Error as e: + error_message = str(e).lower() + if "password authentication failed" in error_message: + self.report.report_failure( + title="Invalid credentials", + message="Failed to connect to Redshift. Please verify your username, password, and database.", + exc=e, + ) + elif "timeout" in error_message: + self.report.report_failure( + title="Unable to connect", + message="Failed to connect to Redshift. Please verify your host name and port number.", + exc=e, + ) + elif "communication error" in error_message: + self.report.report_failure( + title="Unable to connect", + message="Failed to connect to Redshift. Please verify that the host name is valid and reachable.", + exc=e, + ) + elif "database" in error_message and "does not exist" in error_message: + self.report.report_failure( + title="Database does not exist", + message="Failed to connect to Redshift. Please verify that the provided database exists and the provided user has access to it.", + exc=e, + ) + else: + self.report.report_failure( + title="Unable to connect", + message="Failed to connect to Redshift. Please verify your connection details.", + exc=e, + ) + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py index 101146563e8e7..6e88a50f898a5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py @@ -169,6 +169,8 @@ def enrich_tables( self, conn: redshift_connector.Connection, ) -> Dict[str, Dict[str, RedshiftExtraTableMeta]]: + # Warning: This table enrichment will not return anything for + # external tables (spectrum) and for tables that have never been queried / written to. cur = RedshiftDataDictionary.get_query_result( conn, self.queries.additional_table_metadata_query() ) @@ -207,7 +209,7 @@ def get_tables_and_views( # This query needs to run separately as we can't join with the main query because it works with # driver only functions. - enriched_table = self.enrich_tables(conn) + enriched_tables = self.enrich_tables(conn) cur = RedshiftDataDictionary.get_query_result( conn, @@ -216,6 +218,7 @@ def get_tables_and_views( field_names = [i[0] for i in cur.description] db_tables = cur.fetchall() logger.info(f"Fetched {len(db_tables)} tables/views from Redshift") + for table in db_tables: schema = table[field_names.index("schema")] table_name = table[field_names.index("relname")] @@ -233,7 +236,7 @@ def get_tables_and_views( rows_count, size_in_bytes, ) = RedshiftDataDictionary.get_table_stats( - enriched_table, field_names, schema, table + enriched_tables, field_names, schema, table ) tables[schema].append( @@ -263,15 +266,15 @@ def get_tables_and_views( rows_count, size_in_bytes, ) = RedshiftDataDictionary.get_table_stats( - enriched_table=enriched_table, + enriched_tables=enriched_tables, field_names=field_names, schema=schema, table=table, ) materialized = False - if schema in enriched_table and table_name in enriched_table[schema]: - if enriched_table[schema][table_name].is_materialized: + if schema in enriched_tables and table_name in enriched_tables[schema]: + if enriched_tables[schema][table_name].is_materialized: materialized = True views[schema].append( @@ -298,7 +301,7 @@ def get_tables_and_views( return tables, views @staticmethod - def get_table_stats(enriched_table, field_names, schema, table): + def get_table_stats(enriched_tables, field_names, schema, table): table_name = table[field_names.index("relname")] creation_time: Optional[datetime] = None @@ -309,25 +312,41 @@ def get_table_stats(enriched_table, field_names, schema, table): last_altered: Optional[datetime] = None size_in_bytes: Optional[int] = None rows_count: Optional[int] = None - if schema in enriched_table and table_name in enriched_table[schema]: - if enriched_table[schema][table_name].last_accessed: + if schema in enriched_tables and table_name in enriched_tables[schema]: + if enriched_tables[schema][table_name].last_accessed is not None: # Mypy seems to be not clever enough to understand the above check - last_accessed = enriched_table[schema][table_name].last_accessed + last_accessed = enriched_tables[schema][table_name].last_accessed assert last_accessed last_altered = last_accessed.replace(tzinfo=timezone.utc) elif creation_time: last_altered = creation_time - if enriched_table[schema][table_name].size: + if enriched_tables[schema][table_name].size is not None: # Mypy seems to be not clever enough to understand the above check - size = enriched_table[schema][table_name].size + size = enriched_tables[schema][table_name].size if size: size_in_bytes = size * 1024 * 1024 - if enriched_table[schema][table_name].estimated_visible_rows: - rows = enriched_table[schema][table_name].estimated_visible_rows + if enriched_tables[schema][table_name].estimated_visible_rows is not None: + rows = enriched_tables[schema][table_name].estimated_visible_rows assert rows rows_count = int(rows) + else: + # The object was not found in the enriched data. + # + # If we don't have enriched data, it may be either because: + # 1 The table is empty (as per https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_TABLE_INFO.html) empty tables are omitted from svv_table_info. + # 2. The table is external + # 3. The table is a view (non-materialized) + # + # In case 1, we want to report an accurate profile suggesting that the table is empty. + # In case 2, do nothing since we cannot cheaply profile + # In case 3, do nothing since we cannot cheaply profile + if table[field_names.index("tabletype")] == "TABLE": + rows_count = 0 + size_in_bytes = 0 + logger.info("Found some tables with no profiles need to return 0") + return creation_time, last_altered, rows_count, size_in_bytes @staticmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 968989e2548d1..9c8e475e7b307 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -92,16 +92,25 @@ def generate_profile_workunits( request for request in requests if request.profile_table_level_only ] for request in table_level_profile_requests: - table_level_profile = DatasetProfile( - timestampMillis=int(datetime.now().timestamp() * 1000), - columnCount=request.table.column_count, - rowCount=request.table.rows_count, - sizeInBytes=request.table.size_in_bytes, - ) - dataset_urn = self.dataset_urn_builder(request.pretty_name) - yield MetadataChangeProposalWrapper( - entityUrn=dataset_urn, aspect=table_level_profile - ).as_workunit() + if ( + request.table.column_count is None + and request.table.rows_count is None + and request.table.size_in_bytes is None + ): + logger.warning( + f"Table {request.pretty_name} has no column count, rows count, or size in bytes. Skipping emitting table level profile." + ) + else: + table_level_profile = DatasetProfile( + timestampMillis=int(datetime.now().timestamp() * 1000), + columnCount=request.table.column_count, + rowCount=request.table.rows_count, + sizeInBytes=request.table.size_in_bytes, + ) + dataset_urn = self.dataset_urn_builder(request.pretty_name) + yield MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=table_level_profile + ).as_workunit() if not ge_profile_requests: return diff --git a/metadata-ingestion/tests/integration/grafana/default-dashboard.json b/metadata-ingestion/tests/integration/grafana/default-dashboard.json new file mode 100644 index 0000000000000..8ce40ad6acb13 --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/default-dashboard.json @@ -0,0 +1,25 @@ +{ + "id": null, + "uid": "default", + "title": "Default Dashboard", + "tags": [], + "timezone": "browser", + "schemaVersion": 16, + "version": 0, + "panels": [ + { + "type": "text", + "title": "Welcome", + "gridPos": { + "x": 0, + "y": 0, + "w": 24, + "h": 5 + }, + "options": { + "content": "Welcome to your Grafana dashboard!", + "mode": "markdown" + } + } + ] +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/grafana/docker-compose.yml b/metadata-ingestion/tests/integration/grafana/docker-compose.yml new file mode 100644 index 0000000000000..41995a1d49da6 --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/docker-compose.yml @@ -0,0 +1,32 @@ +version: '3.7' + +services: + grafana: + image: grafana/grafana:latest + container_name: grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_SECURITY_ADMIN_USER=admin + - GF_PATHS_PROVISIONING=/etc/grafana/provisioning + volumes: + - grafana-storage:/var/lib/grafana + - ./provisioning:/etc/grafana/provisioning + - ./default-dashboard.json:/var/lib/grafana/dashboards/default-dashboard.json + depends_on: + - postgres + + postgres: + image: postgres:13 + container_name: grafana-postgres + environment: + POSTGRES_DB: grafana + POSTGRES_USER: grafana + POSTGRES_PASSWORD: grafana + volumes: + - postgres-storage:/var/lib/postgresql/data + +volumes: + grafana-storage: + postgres-storage: diff --git a/metadata-ingestion/tests/integration/grafana/grafana_mcps_golden.json b/metadata-ingestion/tests/integration/grafana/grafana_mcps_golden.json new file mode 100644 index 0000000000000..1447e840eac8c --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/grafana_mcps_golden.json @@ -0,0 +1,56 @@ +[ +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(grafana,default)", + "changeType": "UPSERT", + "aspectName": "dashboardInfo", + "aspect": { + "json": { + "customProperties": { + "displayName": "Default Dashboard", + "id": "1", + "uid": "default", + "title": "Default Dashboard", + "uri": "db/default-dashboard", + "type": "dash-db" + }, + "externalUrl": "http://localhost:3000/d/default/default-dashboard", + "title": "Default Dashboard", + "description": "", + "charts": [], + "datasets": [], + "lastModified": { + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + } + }, + "systemMetadata": { + "lastObserved": 1720785600000, + "runId": "grafana-test-simple", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(grafana,default)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1720785600000, + "runId": "grafana-test-simple", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/grafana/provisioning/api-keys/api_keys.yaml b/metadata-ingestion/tests/integration/grafana/provisioning/api-keys/api_keys.yaml new file mode 100644 index 0000000000000..7ef096b6bfe97 --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/provisioning/api-keys/api_keys.yaml @@ -0,0 +1,3 @@ +api_keys: + - name: 'example-api-key' + role: 'Admin' diff --git a/metadata-ingestion/tests/integration/grafana/provisioning/dashboards/dashboard.yaml b/metadata-ingestion/tests/integration/grafana/provisioning/dashboards/dashboard.yaml new file mode 100644 index 0000000000000..e6d4aa3a45a39 --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/provisioning/dashboards/dashboard.yaml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + options: + path: /var/lib/grafana/dashboards diff --git a/metadata-ingestion/tests/integration/grafana/provisioning/datasources/datasource.yaml b/metadata-ingestion/tests/integration/grafana/provisioning/datasources/datasource.yaml new file mode 100644 index 0000000000000..9ba65ec1a54bc --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/provisioning/datasources/datasource.yaml @@ -0,0 +1,12 @@ +apiVersion: 1 + +datasources: + - name: PostgreSQL + type: postgres + access: proxy + url: postgres:5432 + database: grafana + user: grafana + password: grafana + jsonData: + sslmode: disable diff --git a/metadata-ingestion/tests/integration/grafana/provisioning/service_accounts/service_accounts.yaml b/metadata-ingestion/tests/integration/grafana/provisioning/service_accounts/service_accounts.yaml new file mode 100644 index 0000000000000..a6c259aac77ab --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/provisioning/service_accounts/service_accounts.yaml @@ -0,0 +1,6 @@ +service_accounts: + - name: 'example-service-account' + role: 'Admin' + apiKeys: + - keyName: 'example-api-key' + role: 'Admin' diff --git a/metadata-ingestion/tests/integration/grafana/test_grafana.py b/metadata-ingestion/tests/integration/grafana/test_grafana.py new file mode 100644 index 0000000000000..6eb6b0b850926 --- /dev/null +++ b/metadata-ingestion/tests/integration/grafana/test_grafana.py @@ -0,0 +1,191 @@ +import logging +import time +from base64 import b64encode + +import pytest +import requests +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import fs_helpers, mce_helpers +from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port + +pytestmark = pytest.mark.integration_batch_2 + +FROZEN_TIME = "2024-07-12 12:00:00" + + +logger = logging.getLogger(__name__) + + +class GrafanaClient: + def __init__(self, url, admin_user, admin_password): + self.url = url + self.auth = (admin_user, admin_password) + self.headers = { + "Authorization": f"Basic {b64encode(f'{admin_user}:{admin_password}'.encode()).decode()}", + "Content-Type": "application/json", + } + + def create_service_account(self, name, role): + service_account_payload = {"name": name, "role": role, "isDisabled": False} + try: + response = requests.post( + f"{self.url}/api/serviceaccounts", + headers=self.headers, + json=service_account_payload, + ) + response.raise_for_status() + service_account = response.json() + return service_account + except requests.exceptions.RequestException as e: + logging.error(f"Error creating service account: {e}") + return None + + def create_api_key(self, service_account_id, key_name, role): + api_key_payload = {"name": key_name, "role": role} + try: + response = requests.post( + f"{self.url}/api/serviceaccounts/{service_account_id}/tokens", + headers=self.headers, + json=api_key_payload, + ) + response.raise_for_status() + api_key = response.json() + return api_key["key"] + except requests.exceptions.RequestException as e: + logging.error(f"Error creating API key: {e}") + return None + + +@pytest.fixture(scope="module") +def test_resources_dir(pytestconfig): + return pytestconfig.rootpath / "tests/integration/grafana" + + +@pytest.fixture(scope="module") +def test_api_key(): + # Example usage: + url = "http://localhost:3000" + admin_user = "admin" + admin_password = "admin" + + grafana_client = GrafanaClient(url, admin_user, admin_password) + + # Step 1: Create the service account + service_account = grafana_client.create_service_account( + name="example-service-account", role="Viewer" + ) + if service_account: + print(f"Service Account Created: {service_account}") + + # Step 2: Create the API key for the service account + api_key = grafana_client.create_api_key( + service_account_id=service_account["id"], + key_name="example-api-key", + role="Admin", + ) + if api_key: + print("Service Account API Key:", api_key) + return api_key + else: + print("Failed to create API key for the service account") + else: + print("Failed to create service account") + + +@pytest.fixture(scope="module") +def loaded_grafana(docker_compose_runner, test_resources_dir): + with docker_compose_runner( + test_resources_dir / "docker-compose.yml", "grafana" + ) as docker_services: + wait_for_port( + docker_services, + container_name="grafana", + container_port=3000, + timeout=300, + ) + yield docker_services + + # The Grafana image can be large, so we remove it after the test. + cleanup_image("grafana/grafana") + + +@freeze_time(FROZEN_TIME) +def test_grafana_dashboard(loaded_grafana, pytestconfig, tmp_path, test_resources_dir): + # Wait for Grafana to be up and running + url = "http://localhost:3000/api/health" + for i in range(30): + logging.info("waiting for Grafana to start...") + time.sleep(5) + resp = requests.get(url) + if resp.status_code == 200: + logging.info(f"Grafana started after waiting {i*5} seconds") + break + else: + pytest.fail("Grafana did not start in time") + + # Check if the default dashboard is loaded + dashboard_url = "http://localhost:3000/api/dashboards/uid/default" + resp = requests.get(dashboard_url, auth=("admin", "admin")) + assert resp.status_code == 200, "Failed to load default dashboard" + dashboard = resp.json() + + assert ( + dashboard["dashboard"]["title"] == "Default Dashboard" + ), "Default dashboard title mismatch" + assert any( + panel["type"] == "text" for panel in dashboard["dashboard"]["panels"] + ), "Default dashboard missing text panel" + + # Verify the output. (You can add further checks here if needed) + logging.info("Default dashboard verified successfully") + + +@freeze_time(FROZEN_TIME) +def test_grafana_ingest( + loaded_grafana, pytestconfig, tmp_path, test_resources_dir, test_api_key +): + # Wait for Grafana to be up and running + url = "http://localhost:3000/api/health" + for i in range(30): + logging.info("waiting for Grafana to start...") + time.sleep(5) + resp = requests.get(url) + if resp.status_code == 200: + logging.info(f"Grafana started after waiting {i*5} seconds") + break + else: + pytest.fail("Grafana did not start in time") + + # Run the metadata ingestion pipeline. + with fs_helpers.isolated_filesystem(tmp_path): + # Run grafana ingestion run. + pipeline = Pipeline.create( + { + "run_id": "grafana-test-simple", + "source": { + "type": "grafana", + "config": { + "url": "http://localhost:3000", + "service_account_token": test_api_key, + }, + }, + "sink": { + "type": "file", + "config": {"filename": "./grafana_mcps.json"}, + }, + } + ) + pipeline.run() + pipeline.raise_from_status() + + # Verify the output. + mce_helpers.check_golden_file( + pytestconfig, + output_path="grafana_mcps.json", + golden_path=test_resources_dir / "grafana_mcps_golden.json", + ignore_paths=[ + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['last_event_time'\]", + ], + ) diff --git a/metadata-models/src/main/pegasus/com/linkedin/execution/ExecutionRequestResult.pdl b/metadata-models/src/main/pegasus/com/linkedin/execution/ExecutionRequestResult.pdl index 29acd0aa52389..606c3a06bc74b 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/execution/ExecutionRequestResult.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/execution/ExecutionRequestResult.pdl @@ -10,6 +10,10 @@ record ExecutionRequestResult { /** * The status of the execution request */ + @Searchable = { + "fieldType": "KEYWORD", + "fieldName": "executionResultStatus" + } status: string /** @@ -36,4 +40,4 @@ record ExecutionRequestResult { * Duration in milliseconds */ durationMs: optional long -} \ No newline at end of file +}