Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove RESULT_SCAN dep. #13904

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@
from sqlalchemy.engine.row import Row

from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import QueryResult, get_identifiers_from_string
from metadata.utils.profiler_utils import (
SnowflakeQueryResult,
get_identifiers_from_string,
)

logger = profiler_logger()

INFORMATION_SCHEMA_QUERY = """
SELECT * FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
SELECT
QUERY_ID,
QUERY_TEXT,
QUERY_TYPE,
START_TIME,
ROWS_INSERTED,
ROWS_UPDATED,
ROWS_DELETED
FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"
WHERE
start_time>= DATEADD('DAY', -1, CURRENT_TIMESTAMP)
AND QUERY_TEXT ILIKE '%{tablename}%'
Expand All @@ -46,7 +57,7 @@

def get_snowflake_system_queries(
row: Row, database: str, schema: str
) -> Optional[QueryResult]:
) -> Optional[SnowflakeQueryResult]:
"""get snowflake system queries for a specific database and schema. Parsing the query
is the only reliable way to get the DDL operation as fields in the table are not. If parsing
fails we'll fall back to regex lookup
Expand Down Expand Up @@ -87,14 +98,19 @@ def get_snowflake_system_queries(
database.lower() == database_name.lower()
and schema.lower() == schema_name.lower()
):
return QueryResult(
return SnowflakeQueryResult(
query_id=dict_row.get("QUERY_ID", dict_row.get("query_id")),
database_name=database_name.lower(),
schema_name=schema_name.lower(),
table_name=table_name.lower(),
query_text=query_text,
query_type=dict_row.get("QUERY_TYPE", dict_row.get("query_type")),
timestamp=dict_row.get("START_TIME", dict_row.get("start_time")),
rows_inserted=dict_row.get(
"ROWS_INSERTED", dict_row.get("rows_inserted")
),
rows_updated=dict_row.get("ROWS_UPDATED", dict_row.get("rows_updated")),
rows_deleted=dict_row.get("ROWS_DELETED", dict_row.get("rows_deleted")),
)
except Exception:
logger.debug(traceback.format_exc())
Expand Down
33 changes: 26 additions & 7 deletions ingestion/src/metadata/profiler/metrics/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import traceback
from collections import defaultdict
from textwrap import dedent
from typing import Dict, List, Optional

from sqlalchemy import text
Expand All @@ -41,7 +40,6 @@
)
from metadata.profiler.metrics.system.queries.snowflake import (
INFORMATION_SCHEMA_QUERY,
RESULT_SCAN,
get_snowflake_system_queries,
)
from metadata.profiler.orm.registry import Dialects
Expand Down Expand Up @@ -335,16 +333,37 @@ def _(
query_results.append(result)

for query_result in query_results:
cursor_for_result_scan = session.execute(
text(dedent(RESULT_SCAN.format(query_id=query_result.query_id)))
)
row_for_result_scan = cursor_for_result_scan.first()
rows_affected = None
if query_result.query_type == DatabaseDMLOperations.INSERT.value:
rows_affected = query_result.rows_inserted
if query_result.query_type == DatabaseDMLOperations.DELETE.value:
rows_affected = query_result.rows_deleted
if query_result.query_type == DatabaseDMLOperations.UPDATE.value:
rows_affected = query_result.rows_updated
if query_result.query_type == DatabaseDMLOperations.MERGE.value:
if query_result.rows_inserted:
metric_results.append(
{
"timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DatabaseDMLOperations.INSERT.value,
"rowsAffected": query_result.rows_inserted,
}
)
if query_result.rows_updated:
metric_results.append(
{
"timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DatabaseDMLOperations.UPDATE.value,
"rowsAffected": query_result.rows_updated,
}
)
continue

metric_results.append(
{
"timestamp": int(query_result.timestamp.timestamp() * 1000),
"operation": DML_OPERATION_MAP.get(query_result.query_type),
"rowsAffected": row_for_result_scan[0] if row_for_result_scan else None,
"rowsAffected": rows_affected,
}
)

Expand Down
8 changes: 8 additions & 0 deletions ingestion/src/metadata/utils/profiler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class QueryResult(BaseModel):
rows: Optional[int] = None


class SnowflakeQueryResult(QueryResult):
"""Snowflake system metric query result"""

rows_inserted: Optional[int] = None
rows_updated: Optional[int] = None
rows_deleted: Optional[int] = None


def clean_up_query(query: str) -> str:
"""remove comments and newlines from query"""
return sqlparse.format(query, strip_comments=True).replace("\\n", "")
Expand Down
Loading