Skip to content

Commit

Permalink
feat(ingestion): Redshift Usage Source - simplify OperationalStats wo…
Browse files Browse the repository at this point in the history
…rkunit generation. (#4585)

* feat(ingestion): Redshift Usage Source - simplify OperationalStats workunit generation.
  • Loading branch information
rslanka authored Apr 7, 2022
1 parent 55f0412 commit 5e25cd1
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import dataclasses
import logging
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Set, Union
from typing import Dict, Iterable, List, Optional, Set

from pydantic import Field
from pydantic.main import BaseModel
Expand All @@ -29,18 +29,18 @@

logger = logging.getLogger(__name__)

redshift_datetime_format = "%Y-%m-%d %H:%M:%S"
REDSHIFT_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"


# add this join to the sql comment for more metrics on completed queries
# Add this join to the sql query for more metrics on completed queries
# LEFT JOIN svl_query_metrics_summary sqms ON ss.query = sqms.query
# Reference: https://docs.aws.amazon.com/redshift/latest/dg/r_SVL_QUERY_METRICS_SUMMARY.html

# this sql query joins stl_scan over table info,
# querytext, and user info to get usage stats
# using non-LEFT joins here to limit the results to
# queries run by the user on user-defined tables.
redshift_usage_sql_comment = """
REDSHIFT_USAGE_QUERY_TEMPLATE: str = """
SELECT DISTINCT ss.userid as userid,
ss.query as query,
sui.usename as username,
Expand All @@ -62,6 +62,57 @@
ORDER BY ss.endtime DESC;
""".strip()

REDSHIFT_OPERATION_ASPECT_QUERY_TEMPLATE: str = """
(SELECT
DISTINCT si.userid AS userid,
si.query AS query,
si.rows AS rows,
sui.usename AS username,
si.tbl AS tbl,
sq.querytxt AS querytxt,
sti.database AS database,
sti.schema AS schema,
sti.table AS table,
sq.starttime AS starttime,
sq.endtime AS endtime,
'insert' AS operation_type
FROM
stl_insert si
JOIN svv_table_info sti ON si.tbl = sti.table_id
JOIN stl_query sq ON si.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE
si.starttime >= '{start_time}'
AND si.starttime < '{end_time}'
AND si.rows > 0
AND sq.aborted = 0)
UNION
(SELECT
DISTINCT sd.userid AS userid,
sd.query AS query,
sd.rows AS ROWS,
sui.usename AS username,
sd.tbl AS tbl,
sq.querytxt AS querytxt,
sti.database AS database,
sti.schema AS schema,
sti.table AS table,
sq.starttime AS starttime,
sq.endtime AS endtime,
'delete' AS operation_type
FROM
stl_delete sd
JOIN svv_table_info sti ON sd.tbl = sti.table_id
JOIN stl_query sq ON sd.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE
sd.starttime >= '{start_time}'
AND sd.starttime < '{end_time}'
AND sd.rows > 0
AND sq.aborted = 0)
ORDER BY
endtime DESC
""".strip()

RedshiftTableRef = str
AggregatedDataset = GenericAggregatedDataset[RedshiftTableRef]
Expand All @@ -77,6 +128,7 @@ class RedshiftAccessEvent(BaseModel):
database: str
schema_: str = Field(alias="schema")
table: str
operation_type: Optional[str] = None
starttime: datetime
endtime: datetime

Expand All @@ -93,6 +145,8 @@ def get_sql_alchemy_url(self):
@dataclasses.dataclass
class RedshiftUsageSourceReport(SourceReport):
filtered: Set[str] = dataclasses.field(default_factory=set)
num_usage_workunits_emitted: Optional[int] = None
num_operational_stats_workunits_emitted: Optional[int] = None

def report_dropped(self, key: str) -> None:
self.filtered.add(key)
Expand All @@ -117,84 +171,44 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._gen_operation_aspect_workunits(engine)

# Generate aggregate events
query: str = REDSHIFT_USAGE_QUERY_TEMPLATE.format(
start_time=self.config.start_time.strftime(REDSHIFT_DATETIME_FORMAT),
end_time=self.config.end_time.strftime(REDSHIFT_DATETIME_FORMAT),
database=self.config.database,
)
access_events_iterable: Iterable[
RedshiftAccessEvent
] = self._gen_access_events_from_history_query(
self._make_usage_query(redshift_usage_sql_comment), engine
)
] = self._gen_access_events_from_history_query(query, engine)

aggregated_info: AggregatedAccessEvents = self._aggregate_access_events(
aggregated_events: AggregatedAccessEvents = self._aggregate_access_events(
access_events_iterable
)
# Generate usage workunits from aggregated events.
for time_bucket in aggregated_info.values():
self.report.num_usage_workunits_emitted = 0
for time_bucket in aggregated_events.values():
for aggregate in time_bucket.values():
wu: MetadataWorkUnit = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
self.report.num_usage_workunits_emitted += 1
yield wu

def _gen_operation_aspect_workunits_by_type(
self, operation_type: Union[str, "OperationTypeClass"], engine: Engine
) -> Iterable[MetadataWorkUnit]:
# Determine the table to query
op_to_table_map: Dict[str, str] = {
OperationTypeClass.INSERT: "stl_insert",
OperationTypeClass.DELETE: "stl_delete",
}
table_name: Optional[str] = op_to_table_map.get(str(operation_type))
assert table_name is not None
# Get the access events for the table corresponding to the operation.
access_events_iterable: Iterable[
RedshiftAccessEvent
] = self._gen_access_events_from_history_query(
self._make_redshift_operation_aspect_query(table_name), engine
)
# Generate operation aspect work units from the access events
yield from self._gen_operation_aspect_workunits_by_type_from_access_events(
access_events_iterable, operation_type
)

def _gen_operation_aspect_workunits(
self, engine: Engine
) -> Iterable[MetadataWorkUnit]:
yield from self._gen_operation_aspect_workunits_by_type(
OperationTypeClass.INSERT, engine
)
yield from self._gen_operation_aspect_workunits_by_type(
OperationTypeClass.DELETE, engine
# Generate access events
query: str = REDSHIFT_OPERATION_ASPECT_QUERY_TEMPLATE.format(
start_time=self.config.start_time.strftime(REDSHIFT_DATETIME_FORMAT),
end_time=self.config.end_time.strftime(REDSHIFT_DATETIME_FORMAT),
)
access_events_iterable: Iterable[
RedshiftAccessEvent
] = self._gen_access_events_from_history_query(query, engine)

def _make_usage_query(self, query: str) -> str:
return query.format(
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
database=self.config.database,
# Generate operation aspect work units from the access events
yield from self._gen_operation_aspect_workunits_from_access_events(
access_events_iterable
)

def _make_redshift_operation_aspect_query(self, table_name: str) -> str:
return f"""
SELECT DISTINCT ss.userid as userid,
ss.query as query,
ss.rows as rows,
sui.usename as username,
ss.tbl as tbl,
sq.querytxt as querytxt,
sti.database as database,
sti.schema as schema,
sti.table as table,
sq.starttime as starttime,
sq.endtime as endtime
FROM {table_name} ss
JOIN svv_table_info sti ON ss.tbl = sti.table_id
JOIN stl_query sq ON ss.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE ss.starttime >= '{self.config.start_time.strftime(redshift_datetime_format)}'
AND ss.starttime < '{self.config.end_time.strftime(redshift_datetime_format)}'
AND ss.rows > 0
AND sq.aborted = 0
ORDER BY ss.endtime DESC;
""".strip()

def _make_sql_engine(self) -> Engine:
url: str = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url = {url}")
Expand Down Expand Up @@ -238,30 +252,36 @@ def _gen_access_events_from_history_query(
access_event.database = self.config.database_alias
yield access_event

def _gen_operation_aspect_workunits_by_type_from_access_events(
def _gen_operation_aspect_workunits_from_access_events(
self,
events_iterable: Iterable[RedshiftAccessEvent],
operation_type: Union[str, "OperationTypeClass"],
) -> Iterable[MetadataWorkUnit]:
self.report.num_operational_stats_workunits_emitted = 0
for event in events_iterable:
if not (
event.database
and event.username
and event.schema_
and event.table
and event.endtime
and event.operation_type
):
continue

assert event.operation_type in ["insert", "delete"]

resource: str = f"{event.database}.{event.schema_}.{event.table}"
last_updated_timestamp: int = int(event.endtime.timestamp() * 1000)
user_email: str = event.username

operation_aspect = OperationClass(
timestampMillis=last_updated_timestamp,
lastUpdatedTimestamp=last_updated_timestamp,
actor=builder.make_user_urn(user_email.split("@")[0]),
operationType=operation_type,
operationType=(
OperationTypeClass.INSERT
if event.operation_type == "insert"
else OperationTypeClass.DELETE
),
)
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
Expand All @@ -280,6 +300,7 @@ def _gen_operation_aspect_workunits_by_type_from_access_events(
mcp=mcp,
)
self.report.report_workunit(wu)
self.report.num_operational_stats_workunits_emitted += 1
yield wu

def _aggregate_access_events(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,6 @@
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"DELETE\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,63 +37,6 @@
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"INSERT\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"DELETE\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,db1.schema1.category,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"DELETE\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1629795600000,
"runId": "test-redshift-usage",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import json
import pathlib
from typing import Dict, List
from typing import Dict, List, cast
from unittest.mock import patch

from freezegun import freeze_time

from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.usage.redshift_usage import RedshiftUsageConfig
from datahub.ingestion.source.usage.redshift_usage import (
RedshiftUsageConfig,
RedshiftUsageSourceReport,
)
from tests.test_helpers import mce_helpers

FROZEN_TIME = "2021-08-24 09:00:00"
Expand Down Expand Up @@ -76,8 +79,13 @@ def test_redshift_usage_source(pytestconfig, tmp_path):
pipeline.run()
pipeline.raise_from_status()

# There should be 3 calls (usage aspects -1, insert operation aspects -1, delete operation aspects - 1).
assert mock_engine_execute.call_count == 3
# There should be 2 calls (usage aspects -1, operation aspects -1).
assert mock_engine_execute.call_count == 2
source_report: RedshiftUsageSourceReport = cast(
RedshiftUsageSourceReport, pipeline.source.get_report()
)
assert source_report.num_usage_workunits_emitted == 3
assert source_report.num_operational_stats_workunits_emitted == 3
mce_helpers.check_golden_file(
pytestconfig=pytestconfig,
output_path=tmp_path / "redshift_usages.json",
Expand Down
Loading

0 comments on commit 5e25cd1

Please sign in to comment.