-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Removed redundant
pyspark
, databricks-connect
, delta-spark
, and…
… `pandas` dependencies (#193) This PR removes redundant pyspark, databricks-connect, delta-spark, and pandas dependencies and their usages. After it we can use consistent crawlers across HMS Crawling and Workspace Permissions. This PR supersedes and closes #105
- Loading branch information
1 parent
6179ba8
commit bb5d051
Showing
12 changed files
with
95 additions
and
204 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
49 changes: 15 additions & 34 deletions
49
src/databricks/labs/ucx/inventory/permissions_inventory.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,50 +1,31 @@ | ||
import logging | ||
|
||
from databricks.sdk import WorkspaceClient | ||
|
||
from databricks.labs.ucx.inventory.types import PermissionsInventoryItem | ||
from databricks.labs.ucx.providers.spark import SparkMixin | ||
from databricks.labs.ucx.tacl._internal import CrawlerBase, SqlBackend | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PermissionsInventoryTable(SparkMixin): | ||
def __init__(self, inventory_database: str, ws: WorkspaceClient): | ||
super().__init__(ws) | ||
self._table = f"hive_metastore.{inventory_database}.permissions" | ||
|
||
@property | ||
def _table_schema(self): | ||
from pyspark.sql.types import StringType, StructField, StructType | ||
|
||
return StructType( | ||
[ | ||
StructField("object_id", StringType(), True), | ||
StructField("support", StringType(), True), | ||
StructField("raw_object_permissions", StringType(), True), | ||
] | ||
) | ||
|
||
@property | ||
def _df(self): | ||
return self.spark.table(self._table) | ||
class PermissionsInventoryTable(CrawlerBase): | ||
def __init__(self, backend: SqlBackend, inventory_database: str): | ||
super().__init__(backend, "hive_metastore", inventory_database, "permissions") | ||
|
||
def cleanup(self): | ||
logger.info(f"Cleaning up inventory table {self._table}") | ||
self.spark.sql(f"DROP TABLE IF EXISTS {self._table}") | ||
logger.info(f"Cleaning up inventory table {self._full_name}") | ||
self._exec(f"DROP TABLE IF EXISTS {self._full_name}") | ||
logger.info("Inventory table cleanup complete") | ||
|
||
def save(self, items: list[PermissionsInventoryItem]): | ||
# TODO: update instead of append | ||
logger.info(f"Saving {len(items)} items to inventory table {self._table}") | ||
serialized_items = [item.as_dict() for item in items] | ||
df = self.spark.createDataFrame(serialized_items, schema=self._table_schema) | ||
df.write.mode("append").format("delta").saveAsTable(self._table) | ||
logger.info(f"Saving {len(items)} items to inventory table {self._full_name}") | ||
self._append_records(PermissionsInventoryItem, items) | ||
logger.info("Successfully saved the items to inventory table") | ||
|
||
def load_all(self) -> list[PermissionsInventoryItem]: | ||
logger.info(f"Loading inventory table {self._table}") | ||
df = self._df.toPandas() | ||
|
||
logger.info("Successfully loaded the inventory table") | ||
return PermissionsInventoryItem.from_pandas(df) | ||
logger.info(f"Loading inventory table {self._full_name}") | ||
return [ | ||
PermissionsInventoryItem(object_id, support, raw_object_permissions) | ||
for object_id, support, raw_object_permissions in self._fetch( | ||
f"SELECT object_id, support, raw_object_permissions FROM {self._full_name}" | ||
) | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import os | ||
|
||
from databricks.labs.ucx.inventory.permissions_inventory import ( | ||
PermissionsInventoryTable, | ||
) | ||
from databricks.labs.ucx.inventory.types import PermissionsInventoryItem | ||
from databricks.labs.ucx.tacl._internal import StatementExecutionBackend | ||
|
||
|
||
def test_permissions_save_and_load(ws, make_schema): | ||
schema = make_schema().split(".")[-1] | ||
backend = StatementExecutionBackend(ws, os.environ["TEST_DEFAULT_WAREHOUSE_ID"]) | ||
pi = PermissionsInventoryTable(backend, schema) | ||
|
||
saved = [ | ||
PermissionsInventoryItem(object_id="abc", support="bcd", raw_object_permissions="def"), | ||
PermissionsInventoryItem(object_id="efg", support="fgh", raw_object_permissions="ghi"), | ||
] | ||
|
||
pi.save(saved) | ||
loaded = pi.load_all() | ||
|
||
assert saved == loaded |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.