Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into create-admin-page-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ShaileshParmar11 committed Nov 5, 2024
2 parents a4a6525 + a6b5fdc commit 53b3032
Show file tree
Hide file tree
Showing 318 changed files with 4,386 additions and 571 deletions.
11 changes: 4 additions & 7 deletions .github/workflows/py-cli-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ on:
required: True
default: '["bigquery", "dbt_redshift", "metabase", "mssql", "mysql", "redash", "snowflake", "tableau", "powerbi", "vertica", "python", "redshift", "quicksight", "datalake_s3", "postgres", "oracle", "athena", "bigquery_multiple_project"]'
debug:
description: "If Debugging the Pipeline, Slack and Sonar events won't be triggered [default, true or false]"
description: "If Debugging the Pipeline, Slack and Sonar events won't be triggered [default, true or false]. Default will trigger only on main branch."
required: False
default: "default"

env:
DEBUG: ${{ inputs.debug == 'true' || (inputs.debug == 'default' && github.ref != 'refs/heads/main') }}' }}
DEBUG: "${{ (inputs.debug == 'default' && github.ref == 'refs/heads/main' && 'false') || (inputs.debug == 'default' && github.ref != 'refs/heads/main' && 'true') || inputs.debug || 'false' }}"

permissions:
id-token: write
Expand All @@ -38,7 +38,7 @@ jobs:
outputs:
DEBUG: ${{ env.DEBUG }}
steps:
- run: echo "null"
- run: echo "INPUTS_DEBUG=${{ inputs.debug }}, GITHUB_REF=${{ github.ref }}, DEBUG=$DEBUG"

py-cli-e2e-tests:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -189,16 +189,13 @@ jobs:
docker compose down --remove-orphans
sudo rm -rf ${PWD}/docker-volume
- uses: austenstone/job-id@v1
id: job-id

- name: Slack on Failure
if: steps.e2e-test.outcome != 'success' && steps.python-e2e-test.outcome != 'success' && env.DEBUG == 'false'
uses: slackapi/slack-github-action@v1.23.0
with:
payload: |
{
"text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥\nLogs: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}/job/${{ steps.job-id.outputs.job-id }}"
"text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥\nLogs: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.E2E_SLACK_WEBHOOK }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Created by .ignore support plugin (hsz.mobi)
# Maven
.venv
__pycache__
target/
pom.xml.tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,6 @@ WHERE serviceType IN ('Athena','BigQuery','Mssql','Mysql','Oracle','Postgres','R
update dbservice_entity
set json = JSON_SET(json, '$.connection.config.supportsSystemProfile', true)
where serviceType in ('Snowflake', 'Redshift', 'BigQuery');

-- Update all rows in the consumers_dlq table to set the source column to 'publisher'
UPDATE consumers_dlq SET source = 'publisher';
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ CREATE TABLE IF NOT EXISTS apps_data_store (
identifier VARCHAR(256) NOT NULL,
type VARCHAR(256) NOT NULL,
json JSON NOT NULL
);
);

-- Add the source column to the consumers_dlq table
ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255);

-- Create an index on the source column in the consumers_dlq table
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ WHERE serviceType IN ('Athena','BigQuery','Mssql','Mysql','Oracle','Postgres','R
UPDATE dbservice_entity
SET json = jsonb_set(json::jsonb, '{connection,config,supportsSystemProfile}', 'true'::jsonb)
WHERE serviceType IN ('Snowflake', 'Redshift', 'BigQuery');

-- Update all rows in the consumers_dlq table to set the source column to 'publisher'
UPDATE consumers_dlq SET source = 'publisher';
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ CREATE TABLE IF NOT EXISTS apps_data_store (
identifier VARCHAR(256) NOT NULL,
type VARCHAR(256) NOT NULL,
json JSON NOT NULL
);
);

-- Add the source column to the consumers_dlq table
ALTER TABLE consumers_dlq ADD COLUMN source VARCHAR(255);

-- Create an index on the source column in the consumers_dlq table
CREATE INDEX idx_consumers_dlq_source ON consumers_dlq (source);
1 change: 1 addition & 0 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
"elasticsearch": {
VERSIONS["elasticsearch8"],
}, # also requires requests-aws4auth which is in base
"exasol": {"sqlalchemy_exasol>=5,<6"},
"glue": {VERSIONS["boto3"]},
"great-expectations": {VERSIONS["great-expectations"]},
"greenplum": {*COMMONS["postgres"]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def run_validation(self) -> TestCaseResult:
Returns:
TestCaseResult:
"""
matched = False
allowed_values = self.get_test_case_param_value(
self.test_case.parameterValues, # type: ignore
"allowedValues",
Expand All @@ -58,11 +59,12 @@ def run_validation(self) -> TestCaseResult:
try:
column: Union[SQALikeColumn, Column] = self._get_column_name()
res = self._run_results(Metrics.COUNT_IN_SET, column, values=allowed_values)
matched = res > 0
if match_enum:
count = self._run_results(
Metrics.ROW_COUNT, column, values=allowed_values
)
res = count - res
matched = count - res == 0
except (ValueError, RuntimeError) as exc:
msg = f"Error computing {self.test_case.fullyQualifiedName}: {exc}" # type: ignore
logger.debug(traceback.format_exc())
Expand All @@ -81,7 +83,7 @@ def run_validation(self) -> TestCaseResult:

return self.get_test_case_result_object(
self.execution_date,
self.get_test_case_status(res == 0 if match_enum else res >= 1),
self.get_test_case_status(matched),
f"Found countInSet={res}.",
[TestResultValue(name=ALLOWED_VALUE_COUNT, value=str(res))],
row_count=row_count,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import Optional
from urllib.parse import quote_plus

from pydantic import SecretStr
from sqlalchemy.engine import Engine

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.exasolConnection import (
ExasolConnection,
)
from metadata.ingestion.connections.builders import (
create_generic_db_connection,
get_connection_args_common,
)
from metadata.ingestion.connections.test_connections import test_query
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


def get_connection_url(connection: ExasolConnection) -> str:
"""
Common method for building the source connection urls
"""

url = f"{connection.scheme.value}://"

if connection.username:
url += f"{quote_plus(connection.username)}"
connection.password = (
SecretStr("") if not connection.password else connection.password
)
url += (
f":{quote_plus(connection.password.get_secret_value())}"
if connection
else ""
)
url += "@"

url += connection.hostPort

if hasattr(connection, "databaseSchema"):
url += f"/{connection.databaseSchema}" if connection.databaseSchema else ""

tls_settings = {
"validate-certificate": {},
"ignore-certificate": {"SSLCertificate": "SSL_VERIFY_NONE"},
"disable-tls": {"SSLCertificate": "SSL_VERIFY_NONE", "ENCRYPTION": "no"},
}
options = tls_settings[connection.tls.value]
if options:
if (hasattr(connection, "database") and not connection.database) or (
hasattr(connection, "databaseSchema") and not connection.databaseSchema
):
url += "/"
params = "&".join(
f"{key}={quote_plus(value)}" for (key, value) in options.items() if value
)
url = f"{url}?{params}"
return url


def get_connection(connection: ExasolConnection) -> Engine:
"""
Create connection
"""
return create_generic_db_connection(
connection=connection,
get_connection_url_fn=get_connection_url,
get_connection_args_fn=get_connection_args_common,
)


def test_connection(
metadata: OpenMetadata,
engine: Engine,
service_connection: ExasolConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
test_query(engine, "SELECT 1;")
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Optional, cast

from metadata.generated.schema.entity.services.connections.database.exasolConnection import (
ExasolConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService


class ExasolSource(CommonDbSourceService):
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
if config.serviceConnection is None:
raise InvalidSourceException("Missing service connection")
connection = cast(ExasolConnection, config.serviceConnection.root.config)
if not isinstance(connection, ExasolConnection):
raise InvalidSourceException(
f"Expected ExasolConnection, but got {connection}"
)
return cls(config, metadata)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from metadata.ingestion.source.database.exasol.metadata import ExasolSource
from metadata.utils.service_spec.default import DefaultDatabaseSpec

ServiceSpec = DefaultDatabaseSpec(metadata_source_class=ExasolSource)
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
Profiler for Snowflake
"""
from metadata.ingestion.source.database.snowflake.profiler.system import (
SnowflakeSystemMetricsSource,
SnowflakeSystemMetricsComputer,
)
from metadata.profiler.interface.sqlalchemy.snowflake.profiler_interface import (
SnowflakeProfilerInterface,
)
from metadata.profiler.metrics.system.system import SystemMetricsComputer


class SnowflakeProfiler(SnowflakeProfilerInterface):
def initialize_system_metrics_computer(
self, **kwargs
) -> SnowflakeSystemMetricsSource:
return SnowflakeSystemMetricsSource(session=self.session)
def initialize_system_metrics_computer(self, **kwargs) -> SystemMetricsComputer:
return SnowflakeSystemMetricsComputer(session=self.session)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
CacheProvider,
EmptySystemMetricsSource,
SQASessionProvider,
SystemMetricsComputer,
)
from metadata.utils.collections import CaseInsensitiveString
from metadata.utils.logger import profiler_logger
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
from metadata.utils.profiler_utils import get_identifiers_from_string
Expand Down Expand Up @@ -222,7 +224,7 @@ def get_snowflake_system_queries(
"""

try:
logger.debug(f"Trying to parse query [{query_log_entry.query_id}]")
logger.debug(f"Parsing snowflake query [{query_log_entry.query_id}]")
identifier = _parse_query(query_log_entry.query_text)
if not identifier:
raise RuntimeError("Could not identify the table from the query.")
Expand Down Expand Up @@ -358,9 +360,17 @@ def get_system_profile(
}
for q in query_results
if getattr(q, rows_affected_field) > 0
and q.database_name == db
and q.schema_name == schema
and q.table_name == table
# snowflake SQL identifiers are case insensitive. All identifiers are stored in upper case.
and (
CaseInsensitiveString(db),
CaseInsensitiveString(schema),
CaseInsensitiveString(table),
)
== (
q.database_name,
q.schema_name,
q.table_name,
)
]
)

Expand All @@ -387,3 +397,9 @@ def get_queries(self, table: str) -> List[SnowflakeQueryResult]:
for row in queries
]
return [result for result in results if result is not None]


class SnowflakeSystemMetricsComputer(
SystemMetricsComputer, SnowflakeSystemMetricsSource
):
pass
11 changes: 6 additions & 5 deletions ingestion/tests/cli_e2e/base/test_cli_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,6 @@ def assert_status_for_data_quality(self, source_status, sink_status):

def system_profile_assertions(self):
cases = self.get_system_profile_cases()
if not cases:
return
for table_fqn, expected_profile in cases:
actual_profiles = self.openmetadata.get_profile_data(
table_fqn,
Expand All @@ -431,10 +429,13 @@ def system_profile_assertions(self):
profile_type=SystemProfile,
).entities
actual_profiles = sorted(
actual_profiles, key=lambda x: x.timestamp.root
actual_profiles, key=lambda x: (x.timestamp.root, x.operation.value)
)
expected_profile = sorted(
expected_profile,
key=lambda x: (x.timestamp.root, x.operation.value),
)
actual_profiles = actual_profiles[-len(expected_profile) :]
assert len(expected_profile) == len(actual_profiles)
assert len(actual_profiles) >= len(expected_profile)
for expected, actual in zip(expected_profile, actual_profiles):
try:
assert_equal_pydantic_objects(
Expand Down
2 changes: 1 addition & 1 deletion ingestion/tests/cli_e2e/test_cli_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""
Test Bigquery connector with CLI
"""
from typing import List
from typing import List, Tuple

from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.generated.schema.type.basic import Timestamp
Expand Down
Loading

0 comments on commit 53b3032

Please sign in to comment.