Skip to content

Commit

Permalink
remove user_email_pattern from from AggregatedDataset
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Mar 1, 2023
1 parent 0f895ec commit 03a0a50
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -788,14 +788,14 @@ def _aggregate_enriched_read_events(
AggregatedDataset(
bucket_start_time=floored_ts,
resource=resource,
user_email_pattern=self.config.usage.user_email_pattern,
),
)

agg_bucket.add_read_entry(
event.read_event.actor_email,
event.query_event.query if event.query_event else None,
event.read_event.fieldsRead,
user_email_pattern=self.config.usage.user_email_pattern,
)

def get_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
SnowflakePermissionError,
SnowflakeQueryMixin,
)
from datahub.ingestion.source.usage.usage_common import TOTAL_BUDGET_FOR_QUERY_LIST
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetFieldUsageCounts,
DatasetUsageStatistics,
Expand Down Expand Up @@ -218,9 +219,8 @@ def build_usage_statistics_for_dataset(self, dataset_identifier, row):
)

def _map_top_sql_queries(self, top_sql_queries: Dict) -> List[str]:
total_budget_for_query_list: int = 24000
budget_per_query: int = int(
total_budget_for_query_list / self.config.top_n_queries
TOTAL_BUDGET_FOR_QUERY_LIST / self.config.top_n_queries
)
return sorted(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ def _aggregate_access_events(
AggregatedDataset(
bucket_start_time=floored_ts,
resource=resource,
user_email_pattern=self.config.user_email_pattern,
),
)
# current limitation in user stats UI, we need to provide email to show users
Expand All @@ -385,6 +384,7 @@ def _aggregate_access_events(
user_email,
event.text,
[], # TODO: not currently supported by redshift; find column level changes
user_email_pattern=self.config.user_email_pattern,
)
return datasets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import dataclasses
import logging
from datetime import datetime
from typing import Callable, Counter, Generic, List, Optional, TypeVar
from typing import Callable, ClassVar, Counter, Generic, List, Optional, TypeVar

import pydantic
from pydantic.fields import Field
Expand All @@ -21,18 +21,20 @@
DatasetUserUsageCountsClass,
TimeWindowSizeClass,
)
from datahub.utilities.sql_formatter import format_sql_query
from datahub.utilities.sql_formatter import format_sql_query, trim_query

logger = logging.getLogger(__name__)

ResourceType = TypeVar("ResourceType")

# The total number of characters allowed across all queries in a single workunit.
TOTAL_BUDGET_FOR_QUERY_LIST = 24000


@dataclasses.dataclass
class GenericAggregatedDataset(Generic[ResourceType]):
bucket_start_time: datetime
resource: ResourceType
user_email_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()

readCount: int = 0
queryCount: int = 0
Expand All @@ -41,17 +43,14 @@ class GenericAggregatedDataset(Generic[ResourceType]):
userFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)
columnFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)

total_budget_for_query_list: int = 24000
query_trimmer_string_space: int = 10
query_trimmer_string: str = " ..."

def add_read_entry(
self,
user_email: str,
query: Optional[str],
fields: List[str],
user_email_pattern: AllowDenyPattern = AllowDenyPattern.allow_all(),
) -> None:
if not self.user_email_pattern.allowed(user_email):
if not user_email_pattern.allowed(user_email):
return

self.readCount += 1
Expand All @@ -63,37 +62,26 @@ def add_read_entry(
for column in fields:
self.columnFreq[column] += 1

def trim_query(self, query: str, budget_per_query: int) -> str:
trimmed_query = query
if len(query) > budget_per_query:
if budget_per_query - self.query_trimmer_string_space > 0:
end_index = budget_per_query - self.query_trimmer_string_space
trimmed_query = query[:end_index] + self.query_trimmer_string
else:
raise Exception(
"Budget per query is too low. Please, decrease the number of top_n_queries."
)
return trimmed_query

def make_usage_workunit(
self,
bucket_duration: BucketDuration,
urn_builder: Callable[[ResourceType], str],
top_n_queries: int,
format_sql_queries: bool,
include_top_n_queries: bool,
total_budget_for_query_list: int = TOTAL_BUDGET_FOR_QUERY_LIST,
query_trimmer_string: str = " ...",
) -> MetadataWorkUnit:
top_sql_queries: Optional[List[str]] = None
if include_top_n_queries:
budget_per_query: int = int(
self.total_budget_for_query_list / top_n_queries
)
budget_per_query: int = int(total_budget_for_query_list / top_n_queries)
top_sql_queries = [
self.trim_query(
trim_query(
format_sql_query(query, keyword_case="upper", reindent_aligned=True)
if format_sql_queries
else query,
budget_per_query,
budget_per_query=budget_per_query,
query_trimmer_string=query_trimmer_string,
)
for query, _ in self.queryFreq.most_common(top_n_queries)
]
Expand Down Expand Up @@ -159,13 +147,8 @@ class BaseUsageConfig(BaseTimeWindowConfig):
def ensure_top_n_queries_is_not_too_big(cls, v: int) -> int:
minimum_query_size = 20

max_queries = int(
GenericAggregatedDataset.total_budget_for_query_list / minimum_query_size
)
if (
int(GenericAggregatedDataset.total_budget_for_query_list / v)
< minimum_query_size
):
max_queries = int(TOTAL_BUDGET_FOR_QUERY_LIST / minimum_query_size)
if v > max_queries:
raise ValueError(
f"top_n_queries is set to {v} but it can be maximum {max_queries}"
)
Expand Down
14 changes: 9 additions & 5 deletions metadata-ingestion/tests/unit/test_usage_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,17 @@ def test_add_one_query_with_ignored_user():
event_time = datetime(2020, 1, 1)
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
resource = "test_db.test_schema.test_table"
user_email_pattern = AllowDenyPattern(deny=list(["test_email@test.com"]))

ta = _TestAggregatedDataset(
bucket_start_time=floored_ts,
resource=resource,
user_email_pattern=AllowDenyPattern(deny=list(["test_email@test.com"])),
)
ta.add_read_entry(
test_email,
test_query,
[],
user_email_pattern=user_email_pattern,
)

assert ta.queryCount == 0
Expand All @@ -81,26 +82,29 @@ def test_multiple_query_with_ignored_user():
event_time = datetime(2020, 1, 1)
floored_ts = get_time_bucket(event_time, BucketDuration.DAY)
resource = "test_db.test_schema.test_table"
user_email_pattern = AllowDenyPattern(deny=list(["test_email@test.com"]))

ta = _TestAggregatedDataset(
bucket_start_time=floored_ts,
resource=resource,
user_email_pattern=AllowDenyPattern(deny=list(["test_email@test.com"])),
)
ta.add_read_entry(
test_email,
test_query,
[],
user_email_pattern=user_email_pattern,
)
ta.add_read_entry(
test_email,
test_query,
[],
user_email_pattern=user_email_pattern,
)
ta.add_read_entry(
test_email2,
test_query2,
[],
user_email_pattern=user_email_pattern,
)

assert ta.queryCount == 1
Expand Down Expand Up @@ -215,7 +219,6 @@ def test_query_trimming():
resource = "test_db.test_schema.test_table"

ta = _TestAggregatedDataset(bucket_start_time=floored_ts, resource=resource)
ta.total_budget_for_query_list = total_budget_for_query_list
ta.add_read_entry(
test_email,
test_query,
Expand All @@ -227,20 +230,21 @@ def test_query_trimming():
top_n_queries=top_n_queries,
format_sql_queries=False,
include_top_n_queries=True,
total_budget_for_query_list=total_budget_for_query_list,
)

assert wu.id == "2020-01-01T00:00:00-test_db.test_schema.test_table"
assert isinstance(wu.get_metadata()["metadata"], MetadataChangeProposalWrapper)
du: DatasetUsageStatisticsClass = wu.get_metadata()["metadata"].aspect
assert du.totalSqlQueries == 1
assert du.topSqlQueries
assert du.topSqlQueries.pop() == "select * f ..."
assert du.topSqlQueries.pop() == "select * from te ..."


def test_top_n_queries_validator_fails():
with pytest.raises(ValidationError) as excinfo:
with mock.patch(
"datahub.ingestion.source.usage.usage_common.GenericAggregatedDataset.total_budget_for_query_list",
"datahub.ingestion.source.usage.usage_common.TOTAL_BUDGET_FOR_QUERY_LIST",
20,
):
BaseUsageConfig(top_n_queries=2)
Expand Down

0 comments on commit 03a0a50

Please sign in to comment.