Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/main' into forbid-t…
Browse files Browse the repository at this point in the history
…rigger-system-app

# Conflicts:
#	bootstrap/sql/migrations/native/1.5.0/mysql/postDataMigrationSQLScript.sql
#	bootstrap/sql/migrations/native/1.5.0/postgres/postDataMigrationSQLScript.sql
  • Loading branch information
sushi30 committed Jul 22, 2024
2 parents e61c6be + eafa6b8 commit c562d46
Show file tree
Hide file tree
Showing 134 changed files with 3,017 additions and 1,590 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Remove Duplicate Usernames and Lowercase Them
WITH cte AS (
SELECT
id,
ROW_NUMBER() OVER (PARTITION BY LOWER(JSON_UNQUOTE(JSON_EXTRACT(json, '$.name'))) ORDER BY id) as rn
FROM
user_entity
)
DELETE FROM user_entity
WHERE id IN (
SELECT id
FROM cte
WHERE rn > 1
);

UPDATE user_entity
SET json = JSON_SET(
json,
'$.name',
LOWER(JSON_UNQUOTE(JSON_EXTRACT(json, '$.name')))
);

-- Remove Duplicate Emails and Lowercase Them
WITH cte AS (
SELECT
id,
ROW_NUMBER() OVER (PARTITION BY LOWER(JSON_UNQUOTE(JSON_EXTRACT(json, '$.email'))) ORDER BY id) as rn
FROM
user_entity
)
DELETE FROM user_entity
WHERE id IN (
SELECT id
FROM cte
WHERE rn > 1
);

UPDATE user_entity
SET json = JSON_SET(
json,
'$.email',
LOWER(JSON_UNQUOTE(JSON_EXTRACT(json, '$.email')))
);
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Remove Duplicate UserNames and lowercase them
WITH cte AS (
SELECT
id,
ROW_NUMBER() OVER (PARTITION BY to_jsonb(LOWER(json->>'name')) ORDER BY id) as rn
FROM
user_entity
)
DELETE from user_entity
WHERE id IN (
SELECT id
FROM cte
WHERE rn > 1
);

UPDATE user_entity
SET json = jsonb_set(
json,
'{name}',
to_jsonb(LOWER(json->>'name'))
);

-- Remove Duplicate Emails and lowercase them
WITH cte AS (
SELECT
id,
ROW_NUMBER() OVER (PARTITION BY to_jsonb(LOWER(json->>'email')) ORDER BY id) as rn
FROM
user_entity
)
DELETE from user_entity
WHERE id IN (
SELECT id
FROM cte
WHERE rn > 1
);

UPDATE user_entity
SET json = jsonb_set(
json,
'{email}',
to_jsonb(LOWER(json->>'email'))
);
Empty file.
7 changes: 5 additions & 2 deletions bootstrap/sql/migrations/native/1.5.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ CREATE TABLE IF NOT EXISTS api_endpoint_entity (
INDEX (name)
);


-- Clean dangling workflows not removed after test connection
truncate automations_workflow;

-- Remove date, dateTime, time from type_entity, as they are no more om-field-types, instead we have date-cp, time-cp, dateTime-cp as om-field-types
DELETE FROM type_entity
WHERE name IN ('date', 'dateTime', 'time');


-- Update BigQuery,Bigtable & Datalake model for gcpCredentials to move `gcpConfig` value to `gcpConfig.path`
UPDATE dbservice_entity
SET json = JSON_INSERT(
Expand Down Expand Up @@ -144,4 +147,4 @@ SET json = JSON_INSERT(
JSON_EXTRACT(json, '$.sourceConfig.config.dbtConfigSource.dbtSecurityConfig.gcpConfig.type') OR
JSON_EXTRACT(json, '$.sourceConfig.config.dbtConfigSource.dbtSecurityConfig.gcpConfig.externalType') OR
JSON_EXTRACT(json, '$.sourceConfig.config.dbtConfigSource.dbtSecurityConfig.gcpConfig.path')
) is NULL AND JSON_EXTRACT(json, '$.sourceConfig.config.dbtConfigSource.dbtSecurityConfig.gcpConfig') is not null;
) is NULL AND JSON_EXTRACT(json, '$.sourceConfig.config.dbtConfigSource.dbtSecurityConfig.gcpConfig') is not null;
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ CREATE TABLE IF NOT EXISTS api_endpoint_entity (
UNIQUE (fqnHash)
);


-- Clean dangling workflows not removed after test connection
truncate automations_workflow;

-- Remove date, dateTime, time from type_entity, as they are no more om-field-types, instead we have date-cp, time-cp, dateTime-cp as om-field-types
DELETE FROM type_entity
WHERE name IN ('date', 'dateTime', 'time');
Expand Down Expand Up @@ -133,4 +137,4 @@ SET json = jsonb_set(
AND json#>>'{sourceConfig,config,dbtConfigSource,dbtSecurityConfig,gcpConfig}' IS NOT NULL
AND json#>>'{sourceConfig,config,dbtConfigSource,dbtSecurityConfig,gcpConfig,type}' IS NULL
AND json#>>'{sourceConfig,config,dbtConfigSource,dbtSecurityConfig,gcpConfig,externalType}' IS NULL
AND json#>>'{sourceConfig,config,dbtConfigSource,dbtSecurityConfig,gcpConfig,path}' IS NULL;
AND json#>>'{sourceConfig,config,dbtConfigSource,dbtSecurityConfig,gcpConfig,path}' IS NULL;
1 change: 1 addition & 0 deletions conf/openmetadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ authenticationConfiguration:
clientAuthenticationMethod: ${OIDC_CLIENT_AUTH_METHOD:-"client_secret_post"}
tenant: ${OIDC_TENANT:-""}
maxClockSkew: ${OIDC_MAX_CLOCK_SKEW:-""}
tokenValidity: ${OIDC_OM_REFRESH_TOKEN_VALIDITY:-"3600"} # in seconds
customParams: ${OIDC_CUSTOM_PARAMS:-}
samlConfiguration:
debugMode: ${SAML_DEBUG_MODE:-false}
Expand Down
2 changes: 1 addition & 1 deletion ingestion/operators/docker/Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ ENV PIP_NO_CACHE_DIR=1
# Make pip silent
ENV PIP_QUIET=1

RUN pip install --upgrade pip setuptools
RUN pip install --upgrade pip setuptools==69.0.2

ARG INGESTION_DEPENDENCY="all"
RUN pip install ".[airflow]"
Expand Down
4 changes: 2 additions & 2 deletions ingestion/src/metadata/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pathlib import Path

# pyright: reportUnusedCallResult=false
from typing import List, Optional
from typing import List, Optional, Union

from metadata.__version__ import get_metadata_version
from metadata.cli.app import run_app
Expand Down Expand Up @@ -157,7 +157,7 @@ def metadata(args: Optional[List[str]] = None):
if contains_args.get("debug"):
set_loggers_level(logging.DEBUG)
else:
log_level: str = contains_args.get("log_level", logging.INFO)
log_level: Union[str, int] = contains_args.get("log_level") or logging.INFO
set_loggers_level(log_level)

if path and metadata_workflow and metadata_workflow in RUN_PATH_METHODS:
Expand Down
2 changes: 1 addition & 1 deletion ingestion/src/metadata/data_quality/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class TableAndTests(BaseModel):

table: Table = Field(None, description="Table being processed by the DQ workflow")
service_type: str = Field(..., description="Service type the table belongs to")
test_cases: Optional[List[TestCase]] = Field(
test_cases: List[TestCase] = Field(
None, description="Test Cases already existing in the Test Suite, if any"
)
executable_test_suite: Optional[CreateTestSuiteRequest] = Field(
Expand Down
32 changes: 12 additions & 20 deletions ingestion/src/metadata/data_quality/processor/test_case_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""
import traceback
from copy import deepcopy
from typing import List, Optional, cast
from typing import List, Optional

from metadata.data_quality.api.models import (
TableAndTests,
Expand Down Expand Up @@ -90,15 +90,6 @@ def _run(self, record: TableAndTests) -> Either:
),
table_fqn=record.table.fullyQualifiedName.root,
)

if not test_cases:
return Either(
left=StackTraceError(
name="No test Cases",
error=f"No tests cases found for table {record.table.fullyQualifiedName.root}",
)
)

openmetadata_test_cases = self.filter_for_om_test_cases(test_cases)
openmetadata_test_cases = self.filter_incompatible_test_cases(
record.table, openmetadata_test_cases
Expand All @@ -111,6 +102,12 @@ def _run(self, record: TableAndTests) -> Either:
record.table,
).get_data_quality_runner()

logger.debug(
f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}"
)
if len(openmetadata_test_cases) == 0:
logger.warning("No test cases found for the table")

test_results = [
test_case_result
for test_case in openmetadata_test_cases
Expand All @@ -120,17 +117,14 @@ def _run(self, record: TableAndTests) -> Either:
return Either(right=TestCaseResults(test_results=test_results))

def get_test_cases(
self, test_cases: Optional[List[TestCase]], test_suite_fqn: str, table_fqn: str
self, test_cases: List[TestCase], test_suite_fqn: str, table_fqn: str
) -> List[TestCase]:
"""
Based on the test suite test cases that we already know, pick up
the rest from the YAML config, compare and create the new ones
"""
if self.processor_config.testCases is not None:
cli_test_cases = self.get_test_case_from_cli_config() # type: ignore
cli_test_cases = cast(
List[TestCaseDefinition], cli_test_cases
) # satisfy type checker
cli_test_cases = self.get_test_case_from_cli_config()
return self.compare_and_create_test_cases(
cli_test_cases_definitions=cli_test_cases,
test_cases=test_cases,
Expand All @@ -142,15 +136,13 @@ def get_test_cases(

def get_test_case_from_cli_config(
self,
) -> Optional[List[TestCaseDefinition]]:
) -> List[TestCaseDefinition]:
"""Get all the test cases names defined in the CLI config file"""
if self.processor_config.testCases is not None:
return list(self.processor_config.testCases)
return None
return list(self.processor_config.testCases or [])

def compare_and_create_test_cases(
self,
cli_test_cases_definitions: Optional[List[TestCaseDefinition]],
cli_test_cases_definitions: List[TestCaseDefinition],
test_cases: List[TestCase],
table_fqn: str,
test_suite_fqn: str,
Expand Down
5 changes: 2 additions & 3 deletions ingestion/src/metadata/data_quality/source/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _get_table_entity(self) -> Optional[Table]:

def _get_test_cases_from_test_suite(
self, test_suite: Optional[TestSuite]
) -> Optional[List[TestCase]]:
) -> List[TestCase]:
"""Return test cases if the test suite exists and has them"""
if test_suite:
test_cases = self.metadata.list_all_entities(
Expand All @@ -94,8 +94,7 @@ def _get_test_cases_from_test_suite(
t for t in test_cases if t.name in self.source_config.testCases
]
return test_cases

return None
return []

def prepare(self):
"""Nothing to prepare"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

import traceback
from typing import Any, Iterable, List, Optional, Tuple, Union
from typing import Any, Iterable, Optional, Tuple, Union

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
Expand Down Expand Up @@ -57,6 +57,7 @@
from metadata.ingestion.source.database.domodatabase.models import (
OutputDataset,
Owner,
Schema,
SchemaColumn,
User,
)
Expand Down Expand Up @@ -191,11 +192,7 @@ def yield_table(
try:
table_constraints = None
table_object = OutputDataset(**self.domo_client.datasets.get(table_id))
columns = (
self.get_columns(table_object.schemas.columns)
if table_object.columns
else []
)
columns = self.get_columns(table_object)
table_request = CreateTableRequest(
name=EntityName(table_object.name),
displayName=table_object.name,
Expand Down Expand Up @@ -228,19 +225,57 @@ def yield_table(
)
)

def get_columns(self, table_object: List[SchemaColumn]):
def get_columns_from_federated_dataset(self, table_name: str, dataset_id: str):
"""
Method to retrieve the column metadata from federated datasets
"""
try:
# SQL query to get all columns without fetching any rows
sql_query = f'SELECT * FROM "{table_name}" LIMIT 1'
schema_columns = []
response = self.domo_client.datasets.query(dataset_id, sql_query)
if response:
for i, column_name in enumerate(response["columns"] or []):
schema_column = SchemaColumn(
name=column_name, type=response["metadata"][i]["type"]
)
schema_columns.append(schema_column)
if schema_columns:
return Schema(columns=schema_columns)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching columns from federated dataset {table_name} - {exc}"
)
return None

def get_columns(self, table_object: OutputDataset):
"""
Method to get domo table columns
"""
row_order = 1
columns = []
for column in table_object:
columns.append(
Column(
name=ColumnName(column.name),
description=column.description,
dataType=column.type,
ordinalPosition=row_order,
)
if not table_object.schemas or not table_object.schemas.columns:
table_object.schemas = self.get_columns_from_federated_dataset(
table_name=table_object.name, dataset_id=table_object.id
)
row_order += 1

for column in table_object.schemas.columns or []:
try:
columns.append(
Column(
name=ColumnName(column.name),
description=column.description,
dataType=column.type,
ordinalPosition=row_order,
)
)
row_order += 1
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error while fetching details of column {column} - {exc}"
)
return columns

def yield_tag(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DomoDatabaseBaseModel(BaseModel):


class User(DomoDatabaseBaseModel):
id: int
email: str
role: str

Expand Down
Loading

0 comments on commit c562d46

Please sign in to comment.