Skip to content

Commit

Permalink
Merge branch 'main' into test-case-graph-filter-ui-change
Browse files Browse the repository at this point in the history
  • Loading branch information
ShaileshParmar11 authored Sep 12, 2024
2 parents acb8ee3 + 59854de commit 6479851
Show file tree
Hide file tree
Showing 86 changed files with 1,546 additions and 1,328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,13 @@ INNER JOIN test_case tc ON dqdts.entityFQNHash = tc.fqnHash
SET dqdts.json = JSON_SET(dqdts.json,
'$.testCaseFQN', tc.json->'$.fullyQualifiedName',
'$.id', (SELECT UUID())
);
);

-- Add id column to data_quality_data_time_series table
-- after we have added the id values to the records
ALTER TABLE data_quality_data_time_series
ADD COLUMN id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
ADD CONSTRAINT UNIQUE (id);

-- Create index on id column
CREATE INDEX data_quality_data_time_series_id_index ON data_quality_data_time_series (id);
5 changes: 0 additions & 5 deletions bootstrap/sql/migrations/native/1.6.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
ALTER TABLE data_quality_data_time_series
ADD COLUMN id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
ADD CONSTRAINT UNIQUE (id);

CREATE INDEX data_quality_data_time_series_id_index ON data_quality_data_time_series (id);
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ SET json = jsonb_set(
)
FROM test_case tc
WHERE dqdts.entityfqnHash = tc.fqnHash;

-- Add id column to data_quality_data_time_series table
-- after we have added the id values to the records
ALTER TABLE data_quality_data_time_series
ADD COLUMN id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED,
ADD CONSTRAINT id_unique UNIQUE (id);

-- Create index on id column
CREATE INDEX IF NOT EXISTS data_quality_data_time_series_id_index ON data_quality_data_time_series (id);
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@
ALTER TABLE data_quality_data_time_series
ADD COLUMN id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED,
ADD CONSTRAINT id_unique UNIQUE (id);

CREATE INDEX IF NOT EXISTS data_quality_data_time_series_id_index ON data_quality_data_time_series (id);
10 changes: 3 additions & 7 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"sqlalchemy-databricks": "sqlalchemy-databricks~=0.1",
"databricks-sdk": "databricks-sdk>=0.18.0,<0.20.0",
"trino": "trino[sqlalchemy]",
"spacy": "spacy~=3.7",
"spacy": "spacy<3.8",
"looker-sdk": "looker-sdk>=22.20.0",
"lkml": "lkml~=1.3",
"tableau": "tableau-api-lib~=0.1",
Expand Down Expand Up @@ -207,11 +207,8 @@
*COMMONS["datalake"],
},
"datalake-s3": {
# requires aiobotocore
# https://github.com/fsspec/s3fs/blob/9bf99f763edaf7026318e150c4bd3a8d18bb3a00/requirements.txt#L1
# however, the latest version of `s3fs` conflicts its `aiobotocore` dep with `boto3`'s dep on `botocore`.
# Leaving this marked to the automatic resolution to speed up installation.
"s3fs",
# vendoring 'boto3' to keep all dependencies aligned (s3fs, boto3, botocore, aiobotocore)
"s3fs[boto3]",
*COMMONS["datalake"],
},
"deltalake": {"delta-spark<=2.3.0", "deltalake~=0.17"},
Expand Down Expand Up @@ -343,7 +340,6 @@
"coverage",
# Install GE because it's not in the `all` plugin
VERSIONS["great-expectations"],
"moto~=5.0",
"basedpyright~=1.14",
"pytest==7.0.0",
"pytest-cov",
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def run_app(config_path: Path) -> None:

try:
config_dict = load_config_file(config_path)
# no logging for config because apps might have custom secrets
workflow = ApplicationWorkflow.create(config_dict)
except Exception as exc:
logger.error(f"Error running the application {exc}")
Expand Down
5 changes: 4 additions & 1 deletion ingestion/src/metadata/cli/dataquality.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.utils.logger import cli_logger, redacted_config
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

Expand All @@ -37,6 +37,9 @@ def run_test(config_path: Path) -> None:
workflow_config_dict = None
try:
workflow_config_dict = load_config_file(config_path)
logger.debug(
"Using workflow config:\n%s", redacted_config(workflow_config_dict)
)
workflow = TestSuiteWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
Expand Down
3 changes: 2 additions & 1 deletion ingestion/src/metadata/cli/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.utils.logger import cli_logger, redacted_config
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

Expand All @@ -37,6 +37,7 @@ def run_ingest(config_path: Path) -> None:
config_dict = None
try:
config_dict = load_config_file(config_path)
logger.debug("Using workflow config:\n%s", redacted_config(config_dict))
workflow = MetadataWorkflow.create(config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
Expand Down
3 changes: 2 additions & 1 deletion ingestion/src/metadata/cli/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.logger import cli_logger
from metadata.utils.logger import cli_logger, redacted_config
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

logger = cli_logger()
Expand All @@ -52,6 +52,7 @@ def run_lineage(config_path: Path) -> None:
config_dict = None
try:
config_dict = load_config_file(config_path)
logger.debug("Using workflow config:\n%s", redacted_config(config_dict))
workflow = LineageWorkflow.model_validate(config_dict)

except Exception as exc:
Expand Down
5 changes: 4 additions & 1 deletion ingestion/src/metadata/cli/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.utils.logger import cli_logger, redacted_config
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

Expand All @@ -37,6 +37,9 @@ def run_profiler(config_path: Path) -> None:
workflow_config_dict = None
try:
workflow_config_dict = load_config_file(config_path)
logger.debug(
"Using workflow config:\n%s", redacted_config(workflow_config_dict)
)
workflow = ProfilerWorkflow.create(workflow_config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
Expand Down
3 changes: 2 additions & 1 deletion ingestion/src/metadata/cli/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.utils.logger import cli_logger
from metadata.utils.logger import cli_logger, redacted_config
from metadata.workflow.usage import UsageWorkflow
from metadata.workflow.workflow_init_error_handler import WorkflowInitErrorHandler

Expand All @@ -37,6 +37,7 @@ def run_usage(config_path: Path) -> None:
config_dict = None
try:
config_dict = load_config_file(config_path)
logger.debug("Using workflow config:\n%s", redacted_config(config_dict))
workflow = UsageWorkflow.create(config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
Expand Down
15 changes: 13 additions & 2 deletions ingestion/src/metadata/data_quality/source/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
Expand All @@ -36,6 +37,8 @@
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.importer import import_source_class
from metadata.utils.logger import test_suite_logger

logger = test_suite_logger()
Expand Down Expand Up @@ -73,7 +76,7 @@ def _get_table_entity(self) -> Optional[Table]:
table: Table = self.metadata.get_by_name(
entity=Table,
fqn=self.source_config.entityFullyQualifiedName.root,
fields=["tableProfilerConfig", "testSuite"],
fields=["tableProfilerConfig", "testSuite", "serviceType"],
)

return table
Expand Down Expand Up @@ -104,8 +107,16 @@ def test_connection(self) -> None:

def _iter(self) -> Iterable[Either[TableAndTests]]:
table: Table = self._get_table_entity()

if table:
source_type = table.serviceType.value.lower()
if source_type.startswith(CUSTOM_CONNECTOR_PREFIX):
logger.warning(
"Data quality tests might not work as expected with custom sources"
)
else:
import_source_class(
service_type=ServiceType.Database, source_type=source_type
)
yield from self._process_table_suite(table)

else:
Expand Down
111 changes: 109 additions & 2 deletions ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
import functools
import json
import traceback
from typing import Generic, Iterable, List, Optional, Set, Type, TypeVar
from typing import Generic, Iterable, Iterator, List, Optional, Set, Type, TypeVar
from urllib.parse import quote_plus

from pydantic import BaseModel
from pydantic import Field
from typing_extensions import Annotated

from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.query import Query
from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import quote
from metadata.utils.elasticsearch import ES_INDEX_MAP
Expand All @@ -32,6 +35,42 @@
T = TypeVar("T", bound=BaseModel)


class TotalModel(BaseModel):
"""Elasticsearch total model"""

relation: str
value: int


class HitsModel(BaseModel):
"""Elasticsearch hits model"""

index: Annotated[str, Field(description="Index name", alias="_index")]
type: Annotated[str, Field(description="Type of the document", alias="_type")]
id: Annotated[str, Field(description="Document ID", alias="_id")]
score: Annotated[
Optional[float], Field(description="Score of the document", alias="_score")
]
source: Annotated[dict, Field(description="Document source", alias="_source")]
sort: Annotated[
List[str],
Field(description="Sort field. Used internally to get the next page FQN"),
]


class ESHits(BaseModel):
"""Elasticsearch hits model"""

total: Annotated[TotalModel, Field(description="Total matched elements")]
hits: Annotated[List[HitsModel], Field(description="List of matched elements")]


class ESResponse(BaseModel):
"""Elasticsearch response model"""

hits: ESHits


class ESMixin(Generic[T]):
"""
OpenMetadata API methods related to Elasticsearch.
Expand All @@ -46,6 +85,12 @@ class ESMixin(Generic[T]):
"&size={size}&index={index}"
)

# sort_field needs to be unique for the pagination to work, so we can use the FQN
paginate_query = (
"/search/query?q=&size={size}&deleted=false{filter}&index={index}"
"&sort_field=fullyQualifiedName{after}"
)

@functools.lru_cache(maxsize=512)
def _search_es_entity(
self,
Expand Down Expand Up @@ -252,3 +297,65 @@ def es_get_queries_with_lineage(self, service_name: str) -> Optional[Set[str]]:
logger.debug(traceback.format_exc())
logger.warning(f"Unknown error extracting results from ES query [{err}]")
return None

def paginate_es(
self,
entity: Type[T],
query_filter: Optional[str] = None,
size: int = 100,
fields: Optional[List[str]] = None,
) -> Iterator[T]:
"""Paginate through the ES results, ignoring individual errors"""
after: Optional[str] = None
error_pages = 0
query = functools.partial(
self.paginate_query.format,
index=ES_INDEX_MAP[entity.__name__],
filter="&query_filter=" + quote_plus(query_filter) if query_filter else "",
size=size,
)
while True:
query_string = query(
after="&search_after=" + quote_plus(after) if after else ""
)
response = self._get_es_response(query_string)

# Allow 3 errors getting pages before getting out of the loop
if not response:
error_pages += 1
if error_pages < 3:
continue
else:
break

# Get the data
for hit in response.hits.hits:
try:
yield self.get_by_name(
entity=entity,
fqn=hit.source["fullyQualifiedName"],
fields=fields,
nullable=False, # Raise an error if we don't find the Entity
)
except Exception as exc:
logger.warning(
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
)

# Get next page
last_hit = response.hits.hits[-1] if response.hits.hits else None
if not last_hit or not last_hit.sort:
logger.info("No more pages to fetch")
break

after = ",".join(last_hit.sort)

def _get_es_response(self, query_string: str) -> Optional[ESResponse]:
"""Get the Elasticsearch response"""
try:
response = self.client.get(query_string)
return ESResponse.model_validate(response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error while getting ES response: {exc}")
return None
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
_get_schema_column_info,
get_columns,
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
Expand Down Expand Up @@ -122,6 +123,7 @@
PGDialect._get_column_info = _get_pg_column_info # pylint: disable=protected-access
RedshiftDialect.get_all_table_comments = get_all_table_comments
RedshiftDialect.get_table_comment = get_table_comment
RedshiftDialect.get_view_definition = get_view_definition
RedshiftDialect._get_all_relation_info = ( # pylint: disable=protected-access
_get_all_relation_info
)
Expand Down
Loading

0 comments on commit 6479851

Please sign in to comment.