Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into cypress-aut-failure-p…
Browse files Browse the repository at this point in the history
…art-5
  • Loading branch information
ShaileshParmar11 committed Nov 10, 2023
2 parents 89976a9 + de74e1a commit 74e23d2
Show file tree
Hide file tree
Showing 785 changed files with 3,008 additions and 3,577 deletions.
11 changes: 11 additions & 0 deletions bootstrap/sql/migrations/native/1.2.1/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

--update the timestamps to millis for dbt test results
UPDATE data_quality_data_time_series dqdts
SET dqdts.json = JSON_INSERT(
JSON_REMOVE(dqdts.json, '$.timestamp'),
'$.timestamp',
JSON_EXTRACT(dqdts.json, '$.timestamp') * 1000
)
WHERE dqdts.extension = 'testCase.testCaseResult'
AND JSON_EXTRACT(dqdts.json, '$.timestamp') REGEXP '^[0-9]{10}$'
;
13 changes: 12 additions & 1 deletion bootstrap/sql/migrations/native/1.2.1/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,15 @@ SET json = jsonb_set(
true
)
WHERE json #>> '{pipelineType}' = 'metadata'
AND json #>> '{sourceConfig,config,type}' = 'DatabaseMetadata';
AND json #>> '{sourceConfig,config,type}' = 'DatabaseMetadata';


--update the timestamps to millis for dbt test results
UPDATE data_quality_data_time_series dqdts
SET json = jsonb_set(
dqdts.json::jsonb,
'{timestamp}',
to_jsonb(((dqdts.json ->> 'timestamp')::bigint)*1000)
)
WHERE dqdts.extension = 'testCase.testCaseResult'
AND (json->>'timestamp') ~ '^[0-9]{10}$';
32 changes: 17 additions & 15 deletions ingestion/src/metadata/cli/db_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import json
from functools import singledispatch
from pathlib import Path
from typing import List, Optional, Union
from typing import Iterable, List, Optional, Union

from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine
from sqlalchemy.engine import Engine, Row

from metadata.utils.constants import UTF_8

Expand Down Expand Up @@ -121,6 +121,13 @@ def get_hash_column_name(engine: Engine, table_name: str) -> Optional[str]:
return None


def run_query_iter(engine: Engine, query: str) -> Iterable[Row]:
"""Return a generator of rows, one row at a time, with a limit of 100 in-mem rows"""

for row in engine.execute(text(query)).yield_per(100):
yield row


def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
"""
Dumps JSON data.
Expand All @@ -135,14 +142,10 @@ def dump_json(tables: List[str], engine: Engine, output: Path) -> None:

hash_column_name = get_hash_column_name(engine=engine, table_name=table)
if hash_column_name:
res = engine.execute(
text(
STATEMENT_HASH_JSON.format(
table=table, hash_column_name=hash_column_name
)
)
).all()
for row in res:
query = STATEMENT_HASH_JSON.format(
table=table, hash_column_name=hash_column_name
)
for row in run_query_iter(engine=engine, query=query):
insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long
file.write(insert)
else:
Expand All @@ -161,8 +164,8 @@ def dump_all(tables: List[str], engine: Engine, output: Path) -> None:
truncate = STATEMENT_TRUNCATE.format(table=table)
file.write(truncate)

res = engine.execute(text(STATEMENT_ALL.format(table=table))).all()
for row in res:
query = STATEMENT_ALL.format(table=table)
for row in run_query_iter(engine=engine, query=query):
data = ",".join(clean_col(col, engine) for col in row)

insert = f"INSERT INTO {table} VALUES ({data});\n"
Expand All @@ -180,16 +183,15 @@ def dump_entity_custom(engine: Engine, output: Path, inspector) -> None:

columns = inspector.get_columns(table_name=table)

statement = STATEMENT_ALL_NEW.format(
query = STATEMENT_ALL_NEW.format(
cols=",".join(
col["name"]
for col in columns
if col["name"] not in data["exclude_columns"]
),
table=table,
)
res = engine.execute(text(statement)).all()
for row in res:
for row in run_query_iter(engine=engine, query=query):
# Let's use .format here to not add more variables
# pylint: disable=consider-using-f-string
insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format(
Expand Down
7 changes: 7 additions & 0 deletions ingestion/src/metadata/ingestion/lineage/sql_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

logger = utils_logger()
LRU_CACHE_SIZE = 4096
DEFAULT_SCHEMA_NAME = "<default>"


def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
Expand Down Expand Up @@ -145,6 +146,12 @@ def get_table_fqn_from_query_name(
empty_list * (3 - len(split_table))
) + split_table

if schema_query == DEFAULT_SCHEMA_NAME:
schema_query = None

if database_query == DEFAULT_SCHEMA_NAME:
database_query = None

return database_query, schema_query, table


Expand Down
23 changes: 23 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
PatchValue,
)
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.deprecation import deprecated
from metadata.utils.logger import ometa_logger

logger = ometa_logger()
Expand All @@ -44,6 +45,7 @@ class GlossaryMixin(OMetaPatchMixinBase):
To be inherited by OpenMetadata
"""

@deprecated(message="Use metadata.create_or_update instead", release="1.3")
def create_glossary(self, glossaries_body):
"""Method to create new Glossary
Args:
Expand All @@ -54,6 +56,7 @@ def create_glossary(self, glossaries_body):
)
logger.info(f"Created a Glossary: {resp}")

@deprecated(message="Use metadata.create_or_update instead", release="1.3")
def create_glossary_term(self, glossary_term_body):
"""Method to create new Glossary Term
Args:
Expand All @@ -64,6 +67,10 @@ def create_glossary_term(self, glossary_term_body):
)
logger.info(f"Created a Glossary Term: {resp}")

@deprecated(
message="Use metadata.patch instead as the new standard method that will create the jsonpatch dynamically",
release="1.3",
)
def patch_glossary_term_parent(
self,
entity_id: Union[str, basic.Uuid],
Expand Down Expand Up @@ -133,6 +140,10 @@ def patch_glossary_term_parent(

return None

@deprecated(
message="Use metadata.patch instead as the new standard method that will create the jsonpatch dynamically",
release="1.3",
)
def patch_glossary_term_related_terms(
self,
entity_id: Union[str, basic.Uuid],
Expand Down Expand Up @@ -221,6 +232,10 @@ def patch_glossary_term_related_terms(

return None

@deprecated(
message="Use metadata.patch instead as the new standard method that will create the jsonpatch dynamically",
release="1.3",
)
def patch_reviewers(
self,
entity: Union[Type[Glossary], Type[GlossaryTerm]],
Expand Down Expand Up @@ -307,6 +322,10 @@ def patch_reviewers(

return None

@deprecated(
message="Use metadata.patch instead as the new standard method that will create the jsonpatch dynamically",
release="1.3",
)
def patch_glossary_term_synonyms(
self,
entity_id: Union[str, basic.Uuid],
Expand Down Expand Up @@ -385,6 +404,10 @@ def patch_glossary_term_synonyms(

return None

@deprecated(
message="Use metadata.patch instead as the new standard method that will create the jsonpatch dynamically",
release="1.3",
)
def patch_glossary_term_references(
self,
entity_id: Union[str, basic.Uuid],
Expand Down
13 changes: 7 additions & 6 deletions ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ def add_lineage_by_query(
timeout_seconds=timeout,
)
for lineage_request in add_lineage_request or []:
resp = self.add_lineage(lineage_request)
entity_name = resp.get("entity", {}).get("name")
for node in resp.get("nodes", []):
logger.info(
f"added lineage between table {node.get('name')} and {entity_name} "
)
if lineage_request.right:
resp = self.add_lineage(lineage_request.right)
entity_name = resp.get("entity", {}).get("name")
for node in resp.get("nodes", []):
logger.info(
f"added lineage between table {node.get('name')} and {entity_name} "
)
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ class OMetaMlModelMixin(OMetaLineageMixin):

client: REST

def add_mlmodel_lineage(self, model: MlModel) -> Dict[str, Any]:
def add_mlmodel_lineage(
self, model: MlModel, description: Optional[str] = None
) -> Dict[str, Any]:
"""
Iterates over MlModel's Feature Sources and
add the lineage information.
:param model: MlModel containing EntityReferences
:param description: Lineage description
:return: List of added lineage information
"""

Expand All @@ -77,8 +80,8 @@ def add_mlmodel_lineage(self, model: MlModel) -> Dict[str, Any]:
for entity_ref in refs:
self.add_lineage(
AddLineageRequest(
description="MlModel uses FeatureSource",
edge=EntitiesEdge(
description=description,
fromEntity=entity_ref,
toEntity=self.get_entity_reference(
entity=MlModel, fqn=model.fullyQualifiedName
Expand Down
25 changes: 23 additions & 2 deletions ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from metadata.generated.schema.entity.automations.workflow import WorkflowStatus
from metadata.generated.schema.entity.data.table import Column, Table, TableConstraint
from metadata.generated.schema.entity.domains.domain import Domain
from metadata.generated.schema.entity.services.connections.testConnectionResult import (
TestConnectionResult,
)
Expand Down Expand Up @@ -485,7 +486,9 @@ def patch_automation_workflow_response(
f"Error trying to PATCH status for automation workflow [{model_str(automation_workflow)}]: {exc}"
)

def patch_life_cycle(self, entity: Entity, life_cycle: LifeCycle) -> None:
def patch_life_cycle(
self, entity: Entity, life_cycle: LifeCycle
) -> Optional[Entity]:
"""
Patch life cycle data for a entity
Expand All @@ -495,9 +498,27 @@ def patch_life_cycle(self, entity: Entity, life_cycle: LifeCycle) -> None:
try:
destination = entity.copy(deep=True)
destination.lifeCycle = life_cycle
self.patch(entity=type(entity), source=entity, destination=destination)
return self.patch(
entity=type(entity), source=entity, destination=destination
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error trying to Patch life cycle data for {entity.fullyQualifiedName.__root__}: {exc}"
)
return None

def patch_domain(self, entity: Entity, domain: Domain) -> Optional[Entity]:
"""Patch domain data for an Entity"""
try:
destination: Entity = entity.copy(deep=True)
destination.domain = EntityReference(id=domain.id, type="domain")
return self.patch(
entity=type(entity), source=entity, destination=destination
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error trying to Patch Domain for {entity.fullyQualifiedName.__root__}: {exc}"
)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
PatchValue,
)
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.deprecation import deprecated
from metadata.utils.logger import ometa_logger

logger = ometa_logger()
Expand Down Expand Up @@ -129,6 +130,10 @@ def _get_optional_rule_patch(
]
return data

@deprecated(
message="Use metadata.patch instead as the new standard method that will create the jsonpatch dynamically",
release="1.3",
)
def patch_role_policy(
self,
entity_id: Union[str, basic.Uuid],
Expand Down Expand Up @@ -261,6 +266,10 @@ def patch_role_policy(

return None

@deprecated(
message="Use metadata.patch instead as the new standard method that will create the jsonpatch dynamically",
release="1.3",
)
def patch_policy_rule(
self,
entity_id: Union[str, basic.Uuid],
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/ingestion/ometa/ometa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def get_entity_from_create(self, create: Type[C]) -> Type[T]:
.replace("searchindex", "searchIndex")
.replace("storedprocedure", "storedProcedure")
.replace("ingestionpipeline", "ingestionPipeline")
.replace("dataproduct", "dataProduct")
)
class_path = ".".join(
filter(
Expand Down
11 changes: 11 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.domains.createDataProduct import (
CreateDataProductRequest,
)
from metadata.generated.schema.api.domains.createDomain import CreateDomainRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.policies.createPolicy import CreatePolicyRequest
from metadata.generated.schema.api.services.createDashboardService import (
Expand Down Expand Up @@ -107,6 +111,8 @@
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.domains.dataProduct import DataProduct
from metadata.generated.schema.entity.domains.domain import Domain
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.services.connections.testConnectionDefinition import (
TestConnectionDefinition,
Expand Down Expand Up @@ -214,4 +220,9 @@
WebAnalyticEventData.__name__: "/analytics/web/events/collect",
DataInsightChart.__name__: "/analytics/dataInsights/charts",
Kpi.__name__: "/kpi",
# Domains & Data Products
Domain.__name__: "/domains",
CreateDomainRequest.__name__: "/domains",
DataProduct.__name__: "/dataProducts",
CreateDataProductRequest.__name__: "/dataProducts",
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
from metadata.utils.time_utils import convert_timestamp_to_milliseconds

logger = ingestion_logger()

Expand Down Expand Up @@ -845,7 +846,9 @@ def add_dbt_test_result(self, dbt_test: dict):
testCaseFailureReason=None,
testCaseFailureComment=None,
updatedAt=Timestamp(
__root__=int(datetime.utcnow().timestamp() * 1000)
__root__=convert_timestamp_to_milliseconds(
datetime.utcnow().timestamp()
)
),
updatedBy=None,
)
Expand All @@ -863,7 +866,7 @@ def add_dbt_test_result(self, dbt_test: dict):
dbt_timestamp = self.context.run_results_generate_time.timestamp()
# Create the test case result object
test_case_result = TestCaseResult(
timestamp=dbt_timestamp,
timestamp=convert_timestamp_to_milliseconds(dbt_timestamp),
testCaseStatus=test_case_status,
testResultValue=[
TestResultValue(
Expand Down
Loading

0 comments on commit 74e23d2

Please sign in to comment.