Skip to content

Commit

Permalink
feat: support popular_resources() in mysql_proxy (amundsen-io#1488)
Browse files Browse the repository at this point in the history
* support popular_resources endpoint in mysql_proxy for table and dashboard

Signed-off-by: xuans <xuan_shen@outlook.com>

* refactor code

Signed-off-by: xuans <xuan_shen@outlook.com>

* fix mypy issue and some refactoring

Signed-off-by: xuans <xuan_shen@outlook.com>
  • Loading branch information
xuan616 authored Sep 14, 2021
1 parent ed20ce0 commit e491c1f
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 66 deletions.
13 changes: 6 additions & 7 deletions metadata/metadata_service/client/rds_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import amundsen_rds
from alembic import command, script
from alembic.config import Config
from alembic.runtime import migration
from alembic.runtime.migration import MigrationContext
from amundsen_rds.models.base import Base
from sqlalchemy import create_engine
Expand Down Expand Up @@ -55,11 +54,11 @@ def drop_models(self) -> None:

Base.metadata.drop_all(self.engine)

with self.engine.connect() as connection:
migration_ctx = MigrationContext.configure(connection)
with self.engine.connect() as conn:
migration_ctx = MigrationContext.configure(conn)
version = migration_ctx._version
if version.exists(connection):
version.drop(connection)
if version.exists(conn):
version.drop(conn)

def validate_schema_version(self) -> bool:
"""
Expand All @@ -70,8 +69,8 @@ def validate_schema_version(self) -> bool:
config = self._get_alembic_config()
script_directory = script.ScriptDirectory.from_config(config)

with self.engine.begin() as conn:
context = migration.MigrationContext.configure(conn)
with self.engine.connect() as conn:
context = MigrationContext.configure(conn)
current_version = context.get_current_revision()
current_head = script_directory.get_current_head()

Expand Down
237 changes: 192 additions & 45 deletions metadata/metadata_service/proxy/mysql_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from random import randint
from typing import Any, Dict, List, Optional, Tuple, Type, Union

from amundsen_common.entity.resource_type import ResourceType
from amundsen_common.entity.resource_type import ResourceType, to_resource_type
from amundsen_common.models.dashboard import DashboardSummary
from amundsen_common.models.feature import Feature
from amundsen_common.models.generation_code import GenerationCode
Expand All @@ -16,8 +16,8 @@
from amundsen_common.models.table import Badge
from amundsen_common.models.table import Badge as TableBadge
from amundsen_common.models.table import (Column, ProgrammaticDescription,
Reader, Source, Stat, Table, Tag,
User, Watermark)
Reader, Source, Stat, Table,
TableSummary, Tag, User, Watermark)
from amundsen_common.models.user import User as UserEntity
from amundsen_common.models.user import UserSchema
from amundsen_rds.models import RDSModel
Expand All @@ -38,8 +38,6 @@
from amundsen_rds.models.dashboard import \
DashboardFollower as RDSDashboardFollower
from amundsen_rds.models.dashboard import DashboardGroup as RDSDashboardGroup
from amundsen_rds.models.dashboard import \
DashboardGroupDescription as RDSDashboardGroupDescription
from amundsen_rds.models.dashboard import DashboardOwner as RDSDashboardOwner
from amundsen_rds.models.dashboard import DashboardQuery as RDSDashboardQuery
from amundsen_rds.models.dashboard import DashboardTable as RDSDashboardTable
Expand Down Expand Up @@ -78,7 +76,7 @@
_CACHE = CacheManager(**parse_cache_config_options({'cache.type': 'memory'}))

# Expire cache every 11 hours + jitter
_GET_POPULAR_TABLE_CACHE_EXPIRY_SEC = 11 * 60 * 60 + randint(0, 3600)
_GET_POPULAR_RESOURCES_CACHE_EXPIRY_SEC = 11 * 60 * 60 + randint(0, 3600)

resource_relation_model = {
ResourceType.Table: {
Expand Down Expand Up @@ -650,9 +648,9 @@ def get_popular_tables(self, *, num_entries: int, user_id: Optional[str] = None)
:return:
"""
if user_id is None:
table_uris = self._get_global_popular_tables_uris(num_entries=num_entries)
table_uris = self._get_global_popular_resources_uris(num_entries=num_entries)
else:
table_uris = self._get_personal_popular_tables_uris(num_entries=num_entries, user_id=user_id)
table_uris = self._get_personal_popular_resources_uris(num_entries=num_entries, user_id=user_id)

if not table_uris:
return []
Expand Down Expand Up @@ -698,66 +696,227 @@ def get_popular_tables(self, *, num_entries: int, user_id: Optional[str] = None)

return popular_tables

@_CACHE.cache('_get_global_popular_tables_uris', _GET_POPULAR_TABLE_CACHE_EXPIRY_SEC)
def _get_global_popular_tables_uris(self, num_entries: int) -> List[str]:
@timer_with_counter
def get_popular_resources(self, *,
num_entries: int,
resource_types: List[str],
user_id: Optional[str] = None) -> Dict[str, List]:
"""
Retrieve popular resources. As popular resource computation requires full scan of resource usage,
it will cached popular resources uris.
:param num_entries:
:param resource_types:
:param user_id:
:return:
"""
Retrieve popular table uris. Will provide tables with top x popularity score.
popular_resources: Dict[str, List] = dict()
for resource in resource_types:
resource_type = to_resource_type(label=resource)
popular_resources[resource_type.name] = list()
if user_id is None:
# Get global popular Table/Dashboard URIs
resource_uris = self._get_global_popular_resources_uris(num_entries=num_entries,
resource_type=resource_type)
else:
# Get personalized popular Table/Dashboard URIs
resource_uris = self._get_personal_popular_resources_uris(num_entries=num_entries,
user_id=user_id,
resource_type=resource_type)

if resource_type == ResourceType.Table:
popular_resources[resource_type.name] = self._get_popular_tables(table_uris=resource_uris)
elif resource_type == ResourceType.Dashboard:
popular_resources[resource_type.name] = self._get_popular_dashboards(dashboard_uris=resource_uris)

return popular_resources

@_CACHE.cache('_get_global_popular_resources_uris', expire=_GET_POPULAR_RESOURCES_CACHE_EXPIRY_SEC)
def _get_global_popular_resources_uris(self,
num_entries: int,
resource_type: ResourceType = ResourceType.Table) -> List[str]:
"""
Retrieve popular resources uris. Will provide resources with top x popularity score.
Popularity score = number of distinct readers * log(total number of reads)
The result of this method will be cached based on the key (num_entries),
and the cache will be expired based on _GET_POPULAR_TABLE_CACHE_EXPIRY_SEC
and the cache will be expired based on _GET_POPULAR_RESOURCES_CACHE_EXPIRY_SEC
:param num_entries:
:param resource_type:
:return:
"""
LOGGER.info('Querying global popular tables URIs')
LOGGER.info('Querying global popular resources URIs')

num_readers = app.config['POPULAR_RESOURCES_MINIMUM_READER_COUNT']
with self.client.create_session() as session:
readers = func.count(RDSTableUsage.user_rk).label('readers')

relation_model = resource_relation_model[resource_type][UserResourceRel.read]
res_key = f'{resource_type.name.lower()}_rk'
res_attr = getattr(relation_model, res_key)
user_attr = getattr(relation_model, 'user_rk')
read_count_attr = getattr(relation_model, 'read_count')

with self.client.create_session() as session:
readers = func.count(user_attr).label('readers')
usage_subquery = session.query(
RDSTableUsage.table_rk,
res_attr.label('res_key'),
readers,
func.sum(RDSTableUsage.read_count).label('total_reads')
).group_by(RDSTableUsage.table_rk).having(readers >= num_readers).subquery()
func.sum(read_count_attr).label('total_reads')
).group_by(res_attr).having(readers >= num_readers).subquery()

popular_usage = session.query(usage_subquery.c.table_rk).order_by(
popular_usage = session.query(usage_subquery.c.res_key).order_by(
(usage_subquery.c.readers * func.log(usage_subquery.c.total_reads)).desc()
).limit(num_entries).all()

return [usage.table_rk for usage in popular_usage]
return [usage.res_key for usage in popular_usage]

@timer_with_counter
@_CACHE.cache('_get_personal_popular_tables_uris', _GET_POPULAR_TABLE_CACHE_EXPIRY_SEC)
def _get_personal_popular_tables_uris(self, num_entries: int, user_id: str) -> List[str]:
@_CACHE.cache('_get_personal_popular_resources_uris', _GET_POPULAR_RESOURCES_CACHE_EXPIRY_SEC)
def _get_personal_popular_resources_uris(self,
num_entries: int,
user_id: str,
resource_type: ResourceType = ResourceType.Table) -> List[str]:
"""
Retrieve personalized popular table uris. Will provide tables with top
Retrieve personalized popular resources uris. Will provide resources with top
popularity score that have been read by a peer of the user_id provided.
The popularity score is defined in the same way as `_get_global_popular_tables_uris`
The popularity score is defined in the same way as `_get_global_popular_resources_uris`
The result of this method will be cached based on the key (num_entries, user_id),
and the cache will be expired based on _GET_POPULAR_TABLE_CACHE_EXPIRY_SEC
and the cache will be expired based on _GET_POPULAR_RESOURCES_CACHE_EXPIRY_SEC
:param num_entries:
:param user_id:
:param resource_type:
:return:
"""
LOGGER.info('Querying personal popular tables URIs')
LOGGER.info('Querying personal popular resources URIs')

num_readers = app.config['POPULAR_RESOURCES_MINIMUM_READER_COUNT']

relation_model = resource_relation_model[resource_type][UserResourceRel.read]
res_key = f'{resource_type.name.lower()}_rk'
res_attr = getattr(relation_model, res_key)
user_attr = getattr(relation_model, 'user_rk')
read_count_attr = getattr(relation_model, 'read_count')

with self.client.create_session() as session:
readers = func.count(RDSTableUsage.user_rk).label('readers')
readers = func.count(user_attr).label('readers')

usage_subquery = session.query(
RDSTableUsage.table_rk,
res_attr.label('res_key'),
readers,
func.sum(RDSTableUsage.read_count).label('total_reads')
func.sum(read_count_attr).label('total_reads')
).filter(
RDSTableUsage.user_rk == user_id
).group_by(RDSTableUsage.table_rk).having(readers >= num_readers).subquery()
user_attr == user_id
).group_by(res_attr).having(readers >= num_readers).subquery()

popular_usage = session.query(usage_subquery.c.table_rk).order_by(
popular_usage = session.query(usage_subquery.c.res_key).order_by(
(usage_subquery.c.readers * func.log(usage_subquery.c.total_reads)).desc()
).limit(num_entries).all()

return [usage.table_rk for usage in popular_usage]
return [usage.res_key for usage in popular_usage]

def _get_popular_tables(self, *, table_uris: List[str]) -> List[TableSummary]:
"""
Retrieve popular table with the given table uris
:param table_uris:
:return:
"""
if not table_uris:
return []

with self.client.create_session() as session:
# table
query = session.query(RDSTable).filter(RDSTable.rk.in_(table_uris))

# description
query = query.options(
subqueryload(RDSTable.description).options(
load_only(RDSTableDescription.description)
)
)

# schema, cluster, database
query = query.options(
subqueryload(RDSTable.schema).options(
load_only(RDSSchema.name, RDSSchema.cluster_rk),
subqueryload(RDSSchema.cluster).options(
load_only(RDSCluster.name, RDSCluster.database_rk),
subqueryload(RDSCluster.database).options(
load_only(RDSDatabase.name)
)
)
)
)

tables = query.all()

popular_tables = []
for table in tables:
schema = table.schema
cluster = schema.cluster
database = cluster.database
description = table.description
popular_table = TableSummary(database=database.name,
cluster=cluster.name,
schema=schema.name,
name=table.name,
description=description.description if description else None)
popular_tables.append(popular_table)

return popular_tables

def _get_popular_dashboards(self, *, dashboard_uris: List[str]) -> List[DashboardSummary]:
"""
Retrieve popular dashboards with the given dashboard uris
:param dashboard_uris:
:return:
"""
if not dashboard_uris:
return []

with self.client.create_session() as session:
# dashboard
query = session.query(RDSDashboard).filter(RDSDashboard.rk.in_(dashboard_uris))

# description, execution
query = query.options(
subqueryload(RDSDashboard.description).options(
load_only(RDSDashboardDescription.description)
),
subqueryload(RDSDashboard.execution).options(
load_only(RDSDashboardExecution.rk, RDSDashboardExecution.timestamp)
)
)

# group, cluster
query = query.options(
subqueryload(RDSDashboard.group).options(
subqueryload(RDSDashboardGroup.cluster).options(
load_only(RDSDashboardCluster.name)
)
)
)

dashboards = query.all()

popular_dashboards = []
for dashboard in dashboards:
product = dashboard.rk.split('_')[0]
execution = dashboard.execution
description = dashboard.description
group = dashboard.group
cluster = group.cluster
last_exec = next((execution for execution in execution
if execution.rk.endswith('_last_successful_execution')), None)
popular_dashboard = DashboardSummary(uri=dashboard.rk,
cluster=cluster.name,
group_name=group.name,
group_url=group.dashboard_group_url,
product=product,
name=dashboard.name,
url=dashboard.dashboard_url,
description=description.description if description else None,
last_successful_run_timestamp=last_exec.timestamp
if last_exec else None)
popular_dashboards.append(popular_dashboard)

return popular_dashboards

@timer_with_counter
def get_latest_updated_ts(self) -> Optional[int]:
Expand Down Expand Up @@ -838,9 +997,6 @@ def get_dashboard_by_user_relation(self, *, user_email: str, relation_type: User

query = session.query(RDSDashboard).filter(RDSDashboard.rk.in_(dashboard_subquery)).options(
subqueryload(RDSDashboard.group).options(
subqueryload(RDSDashboardGroup.description).options(
load_only(RDSDashboardGroupDescription.description)
),
subqueryload(RDSDashboardGroup.cluster).options(
load_only(RDSDashboardCluster.name)
)
Expand Down Expand Up @@ -1253,9 +1409,6 @@ def get_resources_using_table(self, *, id: str, resource_type: ResourceType) ->

query = query.options(
subqueryload(RDSDashboard.group).options(
subqueryload(RDSDashboardGroup.description).options(
load_only(RDSDashboardGroupDescription.description)
),
subqueryload(RDSDashboardGroup.cluster).options(
load_only(RDSDashboardCluster.name)
)
Expand Down Expand Up @@ -1312,9 +1465,3 @@ def delete_resource_owner(self, *, uri: str, resource_type: ResourceType, owner:

def get_resource_generation_code(self, *, uri: str, resource_type: ResourceType) -> GenerationCode:
pass

def get_popular_resources(self, *,
num_entries: int,
resource_types: List[str],
user_id: Optional[str] = None) -> Dict[str, List]:
raise NotImplementedError
Loading

0 comments on commit e491c1f

Please sign in to comment.