Skip to content

Commit

Permalink
Added support for crawling grants and applying Hive Metastore UDF ACLs (
Browse files Browse the repository at this point in the history
#812)

## Changes
Added support for crawling grants and applying Hive Metastore UDF ACLs.

Resolves #808 

### Functionality 

- [x] added new crawler for udfs
- [x] added handling of udfs to the existing TableAclSupport
- [x] added new column to inventory permissions table for udfs

### Tests

- [x] tested manually
- [x] added unit tests
- [x] added integration tests

---------

Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
  • Loading branch information
mwojtyczka and nfx authored Jan 18, 2024
1 parent af80620 commit c9ee832
Show file tree
Hide file tree
Showing 16 changed files with 682 additions and 71 deletions.
31 changes: 25 additions & 6 deletions src/databricks/labs/ucx/hive_metastore/grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.ucx.hive_metastore.tables import TablesCrawler
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler

logger = logging.getLogger(__name__)

Expand All @@ -21,6 +22,7 @@ class Grant:
database: str | None = None
table: str | None = None
view: str | None = None
udf: str | None = None
any_file: bool = False
anonymous_function: bool = False

Expand All @@ -31,6 +33,7 @@ def type_and_key(
database: str | None = None,
table: str | None = None,
view: str | None = None,
udf: str | None = None,
any_file: bool = False,
anonymous_function: bool = False,
) -> tuple[str, str]:
Expand All @@ -42,6 +45,10 @@ def type_and_key(
catalog = "hive_metastore" if catalog is None else catalog
database = "default" if database is None else database
return "VIEW", f"{catalog}.{database}.{view}"
if udf is not None:
catalog = "hive_metastore" if catalog is None else catalog
database = "default" if database is None else database
return "FUNCTION", f"{catalog}.{database}.{udf}"
if database is not None:
catalog = "hive_metastore" if catalog is None else catalog
return "DATABASE", f"{catalog}.{database}"
Expand All @@ -53,7 +60,7 @@ def type_and_key(
if catalog is not None:
return "CATALOG", catalog
msg = (
f"invalid grant keys: catalog={catalog}, database={database}, view={view}, "
f"invalid grant keys: catalog={catalog}, database={database}, view={view}, udf={udf}"
f"any_file={any_file}, anonymous_function={anonymous_function}"
)
raise ValueError(msg)
Expand All @@ -69,6 +76,7 @@ def this_type_and_key(self):
database=self.database,
table=self.table,
view=self.view,
udf=self.udf,
any_file=self.any_file,
anonymous_function=self.anonymous_function,
)
Expand Down Expand Up @@ -135,9 +143,13 @@ def uc_grant_sql(self):


class GrantsCrawler(CrawlerBase[Grant]):
def __init__(self, tc: TablesCrawler):
def __init__(self, tc: TablesCrawler, udf: UdfsCrawler):
assert tc._backend == udf._backend
assert tc._catalog == udf._catalog
assert tc._schema == udf._schema
super().__init__(tc._backend, tc._catalog, tc._schema, "grants", Grant)
self._tc = tc
self._udf = udf

def snapshot(self) -> Iterable[Grant]:
return self._snapshot(partial(self._try_load), partial(self._crawl))
Expand All @@ -148,7 +160,7 @@ def _try_load(self):

def _crawl(self) -> Iterable[Grant]:
"""
Crawls and lists grants for all databases, tables, views, any file
Crawls and lists grants for all databases, tables, views, udfs, any file
and anonymous function within hive_metastore.
Returns:
Expand All @@ -159,12 +171,14 @@ def _crawl(self) -> Iterable[Grant]:
table/view-specific grants.
- Iterates through tables in the specified database using the `_tc.snapshot` method.
- For each table, adds tasks to fetch grants for the table or its view, depending on the kind of the table.
- Iterates through udfs in the specified database using the `_udf.snapshot` method.
- For each udf, adds tasks to fetch grants for the udf.
- Executes the tasks concurrently using Threads.gather.
- Flattens the list of retrieved grant lists into a single list of Grant objects.
Note:
- The method assumes that the `_grants` method fetches grants based on the provided parameters (catalog,
database, table, view, any file, anonymous function).
database, table, view, udfs, any file, anonymous function).
Returns:
list[Grant]: A list of Grant objects representing the grants found in hive_metastore.
Expand All @@ -181,6 +195,9 @@ def _crawl(self) -> Iterable[Grant]:
fn = partial(self._grants, catalog=catalog, database=table.database)
# views are recognized as tables
tasks.append(partial(fn, table=table.name))
for udf in self._udf.snapshot():
fn = partial(self._grants, catalog=catalog, database=udf.database)
tasks.append(partial(fn, udf=udf.name))
catalog_grants, errors = Threads.gather(f"listing grants for {catalog}", tasks)
if len(errors) > 0:
raise ManyError(errors)
Expand All @@ -206,6 +223,7 @@ def _grants(
database: str | None = None,
table: str | None = None,
view: str | None = None,
udf: str | None = None,
any_file: bool = False,
anonymous_function: bool = False,
) -> list[Grant]:
Expand All @@ -217,6 +235,7 @@ def _grants(
database (str | None): The database name (optional).
table (str | None): The table name (optional).
view (str | None): The view name (optional).
udf (str | None): The udf name (optional).
any_file (bool): Whether to include any file grants (optional).
anonymous_function (bool): Whether to include anonymous function grants (optional).
Expand Down Expand Up @@ -245,13 +264,12 @@ def _grants(
database=self._try_valid(database),
table=self._try_valid(table),
view=self._try_valid(view),
udf=self._try_valid(udf),
any_file=any_file,
anonymous_function=anonymous_function,
)
try:
grants = []
# Added ANY FILE and ANONYMOUS FUNCTION in object_type_normalization
# to capture the same in grants. issue:#623
object_type_normalization = {
"SCHEMA": "DATABASE",
"CATALOG$": "CATALOG",
Expand All @@ -271,6 +289,7 @@ def _grants(
action_type=action_type,
table=table,
view=view,
udf=udf,
database=database,
catalog=catalog,
any_file=any_file,
Expand Down
104 changes: 104 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/udfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging
from collections.abc import Iterable, Iterator
from dataclasses import dataclass
from functools import partial

from databricks.labs.blueprint.parallel import Threads

from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend
from databricks.labs.ucx.mixins.sql import Row

logger = logging.getLogger(__name__)


@dataclass
class Udf:
catalog: str
database: str
name: str
func_type: str
func_input: str
func_returns: str
deterministic: bool
data_access: str
body: str
comment: str = ""

@property
def key(self) -> str:
return f"{self.catalog}.{self.database}.{self.name}".lower()


class UdfsCrawler(CrawlerBase):
def __init__(self, backend: SqlBackend, schema):
"""
Initializes a UdfsCrawler instance.
Args:
backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark)
schema: The schema name for the inventory persistence.
"""
super().__init__(backend, "hive_metastore", schema, "udfs", Udf)

def _all_databases(self) -> Iterator[Row]:
yield from self._fetch("SHOW DATABASES")

def snapshot(self) -> list[Udf]:
"""
Takes a snapshot of tables in the specified catalog and database.
Returns:
list[Udf]: A list of Udf objects representing the snapshot of tables.
"""
return self._snapshot(self._try_load, self._crawl)

def _try_load(self) -> Iterable[Udf]:
"""Tries to load udf information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
for row in self._fetch(f"SELECT * FROM {self._full_name}"):
yield Udf(*row)

def _crawl(self) -> Iterable[Udf]:
"""Crawls and lists udfs within the specified catalog and database."""
tasks = []
catalog = "hive_metastore"
# need to set the current catalog otherwise "SHOW USER FUNCTIONS FROM" is raising error:
# "target schema <database> is not in the current catalog"
self._exec(f"USE CATALOG {catalog};")
for (database,) in self._all_databases():
logger.debug(f"[{catalog}.{database}] listing udfs")
for (udf,) in self._fetch(f"SHOW USER FUNCTIONS FROM {catalog}.{database};"):
if udf.startswith(f"{catalog}.{database}"):
udf_name = udf[udf.rfind(".") + 1 :] # remove catalog and database info from the name
tasks.append(partial(self._describe, catalog, database, udf_name))
catalog_tables, errors = Threads.gather(f"listing udfs in {catalog}", tasks)
if len(errors) > 0:
logger.error(f"Detected {len(errors)} while scanning udfs in {catalog}")
return catalog_tables

def _describe(self, catalog: str, database: str, udf: str) -> Udf | None:
"""Fetches metadata like udf type, input, returns, data access and body
if specified for a specific udf within the given catalog and database.
"""
full_name = f"{catalog}.{database}.{udf}"
try:
logger.debug(f"[{full_name}] fetching udf metadata")
describe = {}
for key_value in self._fetch(f"DESCRIBE FUNCTION EXTENDED {full_name}"):
if ":" in key_value: # skip free text configs that don't have a key
key, value = key_value.split(":")
describe[key] = value.strip()
return Udf(
catalog=catalog.lower(),
database=database.lower(),
name=udf.lower(),
func_type=describe.get("Type", "UNKNOWN"),
func_input=describe.get("Input", "UNKNOWN"),
func_returns=describe.get("Returns", "UNKNOWN"),
deterministic=describe.get("Deterministic", False),
data_access=describe.get("Type", "UNKNOWN"),
comment=describe.get("Comment", "UNKNOWN"),
body=describe.get("Body", "UNKNOWN"),
)
except Exception as e:
logger.error(f"Couldn't fetch information for udf {full_name} : {e}")
return None
42 changes: 41 additions & 1 deletion src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
import pytest
from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.core import DatabricksError
from databricks.sdk.errors import ResourceConflict
from databricks.sdk.errors import NotFound, ResourceConflict
from databricks.sdk.retries import retried
from databricks.sdk.service import compute, iam, jobs, pipelines, sql, workspace
from databricks.sdk.service.catalog import (
CatalogInfo,
DataSourceFormat,
FunctionInfo,
SchemaInfo,
TableInfo,
TableType,
Expand Down Expand Up @@ -1023,6 +1024,45 @@ def remove(table_info: TableInfo):
yield from factory("table", create, remove)


@pytest.fixture
def make_udf(sql_backend, make_schema, make_random) -> Generator[Callable[..., FunctionInfo], None, None]:
def create(
*, catalog_name="hive_metastore", schema_name: str | None = None, name: str | None = None
) -> FunctionInfo:
if schema_name is None:
schema = make_schema(catalog_name=catalog_name)
catalog_name = schema.catalog_name
schema_name = schema.name

if name is None:
name = f"ucx_T{make_random(4)}".lower()

full_name = f"{catalog_name}.{schema_name}.{name}".lower()
ddl = f"CREATE FUNCTION {full_name}(x INT) RETURNS FLOAT CONTAINS SQL DETERMINISTIC RETURN 0;"

sql_backend.execute(ddl)
udf_info = FunctionInfo(
catalog_name=catalog_name,
schema_name=schema_name,
name=name,
full_name=full_name,
)

logger.info(f"Function {udf_info.full_name} crated")
return udf_info

def remove(udf_info: FunctionInfo):
try:
sql_backend.execute(f"DROP FUNCTION IF EXISTS {udf_info.full_name}")
except NotFound as e:
if "SCHEMA_NOT_FOUND" in str(e):
logger.warning("Schema was already dropped while executing the test", exc_info=e)
else:
raise e

yield from factory("table", create, remove)


@pytest.fixture
def make_query(ws, make_table, make_random):
def create() -> QueryInfo:
Expand Down
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/queries/views/grant_detail.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SELECT
WHEN table IS NOT NULL THEN 'TABLE'
WHEN database IS NOT NULL THEN 'DATABASE'
WHEN catalog IS NOT NULL THEN 'CATALOG'
WHEN udf IS NOT NULL THEN 'UDF'
ELSE 'UNKNOWN'
END AS object_type,
CASE
Expand All @@ -15,6 +16,7 @@ SELECT
WHEN table IS NOT NULL THEN CONCAT(catalog, '.', database, '.', table)
WHEN database IS NOT NULL THEN CONCAT(catalog, '.', database)
WHEN catalog IS NOT NULL THEN catalog
WHEN udf IS NOT NULL THEN CONCAT(catalog, '.', database, '.', udf)
ELSE 'UNKNOWN'
END AS object_id,
action_type,
Expand All @@ -28,5 +30,6 @@ SELECT
principal,
catalog,
database,
table
table,
udf
FROM $inventory.grants where database != split("$inventory",'[.]')[1]
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
TablesCrawler,
)
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
from databricks.labs.ucx.workspace_access.generic import WorkspaceListing
from databricks.labs.ucx.workspace_access.groups import GroupManager
from databricks.labs.ucx.workspace_access.manager import PermissionManager
Expand Down Expand Up @@ -53,7 +54,8 @@ def crawl_grants(cfg: WorkspaceConfig):
ACLs enabled and available for retrieval."""
backend = RuntimeBackend()
tables = TablesCrawler(backend, cfg.inventory_database)
grants = GrantsCrawler(tables)
udfs = UdfsCrawler(backend, cfg.inventory_database)
grants = GrantsCrawler(tables, udfs)
grants.snapshot()


Expand Down
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/workspace_access/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
SqlBackend,
)
from databricks.labs.ucx.hive_metastore import GrantsCrawler, TablesCrawler
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
from databricks.labs.ucx.workspace_access import generic, redash, scim, secrets
from databricks.labs.ucx.workspace_access.base import AclSupport, Permissions
from databricks.labs.ucx.workspace_access.groups import MigrationState
Expand Down Expand Up @@ -71,7 +72,8 @@ def factory(
secrets_support = secrets.SecretScopesSupport(ws)
scim_support = scim.ScimSupport(ws)
tables_crawler = TablesCrawler(sql_backend, inventory_database)
grants_crawler = GrantsCrawler(tables_crawler)
udfs_crawler = UdfsCrawler(sql_backend, inventory_database)
grants_crawler = GrantsCrawler(tables_crawler, udfs_crawler)
tacl_support = TableAclSupport(grants_crawler, sql_backend)
return cls(
sql_backend, inventory_database, [generic_support, sql_support, secrets_support, scim_support, tacl_support]
Expand Down
5 changes: 4 additions & 1 deletion src/databricks/labs/ucx/workspace_access/tacl.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def _from_reduced(self, object_type: str, object_id: str, principal: str, action
case "CATALOG":
catalog = object_id
return Grant(principal=principal, action_type=action_type, catalog=catalog)
case "FUNCTION":
catalog, database, udf = object_id.split(".")
return Grant(principal=principal, action_type=action_type, catalog=catalog, database=database, udf=udf)
case "ANONYMOUS FUNCTION":
catalog = object_id
return Grant(principal=principal, action_type=action_type, catalog=catalog, anonymous_function=True)
Expand All @@ -73,7 +76,7 @@ def _from_reduced(self, object_type: str, object_id: str, principal: str, action
return Grant(principal=principal, action_type=action_type, catalog=catalog, any_file=True)

def object_types(self) -> set[str]:
return {"TABLE", "DATABASE", "VIEW", "CATALOG", "ANONYMOUS FUNCTION", "ANY FILE"}
return {"TABLE", "DATABASE", "VIEW", "CATALOG", "FUNCTION", "ANONYMOUS FUNCTION", "ANY FILE"}

def get_apply_task(self, item: Permissions, migration_state: MigrationState):
grant = Grant(**json.loads(item.raw))
Expand Down
Loading

0 comments on commit c9ee832

Please sign in to comment.