Skip to content

Commit

Permalink
Adding CSV, JSON and include path in mounts (#1329)
Browse files Browse the repository at this point in the history
## Changes
Adding CSV and JSON support to TablesInMounts
Adding include_path_in_mounts parameter to crawl for a specific list of
paths in all mounts
  • Loading branch information
william-conti authored Apr 8, 2024
1 parent 9e0c1b5 commit fc1747f
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes
# If not specified, it will list all moubts.
include_mounts: list[str] | None = None
exclude_paths_in_mount: list[str] | None = None
include_paths_in_mount: list[str] | None = None

def replace_inventory_variable(self, text: str) -> str:
return text.replace("$inventory", f"hive_metastore.{self.inventory_database}")
Expand Down
71 changes: 51 additions & 20 deletions src/databricks/labs/ucx/hive_metastore/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service.catalog import ExternalLocationInfo

from databricks.sdk.dbutils import FileInfo
from databricks.labs.ucx.framework.crawlers import CrawlerBase, Result, ResultFn
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.tables import Table
Expand Down Expand Up @@ -261,6 +261,7 @@ class TableInMount:


class TablesInMounts(CrawlerBase[Table]):
TABLE_IN_MOUNT_DB = "mounted_"

def __init__(
self,
Expand All @@ -270,11 +271,14 @@ def __init__(
mc: Mounts,
include_mounts: list[str] | None = None,
exclude_paths_in_mount: list[str] | None = None,
include_paths_in_mount: list[str] | None = None,
):
super().__init__(backend, "hive_metastore", inventory_database, "tables", Table)
self._dbutils = ws.dbutils
self._mc = mc
self._mounts_crawler = mc
self._include_mounts = include_mounts
self._ws = ws
self._include_paths_in_mount = include_paths_in_mount

irrelevant_patterns = {'_SUCCESS', '_committed_', '_started_'}
if exclude_paths_in_mount:
Expand Down Expand Up @@ -305,24 +309,29 @@ def _append_records(self, items: Sequence[Table]):
def _try_load(self) -> Iterable[Table]:
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
for row in self._fetch(
f"SELECT * FROM {escape_sql_identifier(self.full_name)} WHERE NOT STARTSWITH(database, 'mounted_')"
f"SELECT * FROM {escape_sql_identifier(self.full_name)} WHERE NOT STARTSWITH(database, '{self.TABLE_IN_MOUNT_DB}')"
):
yield Table(*row)

def _crawl(self):
all_mounts = self._mc.snapshot()
all_mounts = self._mounts_crawler.snapshot()
all_tables = []
for mount in all_mounts:
if self._include_mounts and mount.name not in self._include_mounts:
logger.info(f"Filtering mount {mount.name}")
continue
table_paths = self._find_delta_log_folders(mount.name)
logger.info(f"Found {len(table_paths)} in mount {mount.name}")
table_paths = {}
if self._include_paths_in_mount:
for path in self._include_paths_in_mount:
table_paths.update(self._find_delta_log_folders(path))
else:
table_paths = self._find_delta_log_folders(mount.name)

for path, entry in table_paths.items():
guess_table = os.path.basename(path)
table = Table(
catalog="hive_metastore",
database=f"mounted_{mount.name.replace('/mnt/', '').replace('/', '_')}",
database=f"{self.TABLE_IN_MOUNT_DB}{mount.name.replace('/mnt/', '').replace('/', '_')}",
name=guess_table,
object_type="EXTERNAL",
table_format=entry.format,
Expand All @@ -332,7 +341,7 @@ def _crawl(self):
all_tables.append(table)
return all_tables

def _find_delta_log_folders(self, root_dir, delta_log_folders=None) -> dict:
def _find_delta_log_folders(self, root_dir: str, delta_log_folders=None) -> dict:
if delta_log_folders is None:
delta_log_folders = {}
logger.info(f"Listing {root_dir}")
Expand All @@ -351,23 +360,37 @@ def _find_delta_log_folders(self, root_dir, delta_log_folders=None) -> dict:
logger.debug(f"Path {file_info.path} was identified as Delta, skipping")
continue

if file_info.name == "_delta_log/":
logger.debug(f"Found delta table {root_path}")
if delta_log_folders.get(root_path) and delta_log_folders.get(root_path).is_partitioned:
delta_log_folders[root_path] = TableInMount(format="DELTA", is_partitioned=True)
else:
delta_log_folders[root_path] = TableInMount(format="DELTA", is_partitioned=False)
elif self._is_partitioned(file_info.name):
logger.debug(f"Found partitioned parquet {file_info.path}")
delta_log_folders[root_path] = TableInMount(format="PARQUET", is_partitioned=True)
elif self._is_parquet(file_info.name):
logger.debug(f"Found parquet {file_info.path}")
delta_log_folders[root_path] = TableInMount(format="PARQUET", is_partitioned=False)
table_in_mount = self._assess_path(file_info, delta_log_folders, root_path)
if table_in_mount:
delta_log_folders[root_path] = table_in_mount
else:
self._find_delta_log_folders(file_info.path, delta_log_folders)

return delta_log_folders

def _assess_path(
self, file_info: FileInfo, delta_log_folders: dict[str, Table], root_path: str
) -> TableInMount | None:
if file_info.name == "_delta_log/":
logger.debug(f"Found delta table {root_path}")
if not delta_log_folders.get(root_path):
return TableInMount(format="DELTA", is_partitioned=False)
if delta_log_folders[root_path].is_partitioned:
return TableInMount(format="DELTA", is_partitioned=True)
if self._is_partitioned(file_info.name):
logger.debug(f"Found partitioned parquet {file_info.path}")
return TableInMount(format="PARQUET", is_partitioned=True)
if self._is_csv(file_info.name):
logger.debug(f"Found csv {file_info.path}")
return TableInMount(format="CSV", is_partitioned=False)
if self._is_json(file_info.name):
logger.debug(f"Found json {file_info.path}")
return TableInMount(format="JSON", is_partitioned=False)
if self._is_parquet(file_info.name):
logger.debug(f"Found parquet {file_info.path}")
return TableInMount(format="PARQUET", is_partitioned=False)
return None

def _path_is_delta(self, delta_log_folders, path: str) -> bool:
return delta_log_folders.get(path) and delta_log_folders.get(path).format == "DELTA"

Expand All @@ -378,5 +401,13 @@ def _is_parquet(self, file_name: str) -> bool:
parquet_patterns = {'.parquet'}
return any(pattern in file_name for pattern in parquet_patterns)

def _is_csv(self, file_name: str) -> bool:
csv_patterns = {'.csv'}
return any(pattern in file_name for pattern in csv_patterns)

def _is_json(self, file_name: str) -> bool:
json_patterns = {'.json'}
return any(pattern in file_name for pattern in json_patterns)

def _is_irrelevant(self, file_name: str) -> bool:
return any(pattern in file_name for pattern in self._fiter_paths)
86 changes: 86 additions & 0 deletions tests/unit/hive_metastore/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,89 @@ def my_side_effect(path, **_):
is_partitioned=False,
),
]


def test_mount_include_paths():
client = create_autospec(WorkspaceClient)

first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", "", "")
second_folder = FileInfo("dbfs:/mnt/test_mount/table2/", "table2/", "", "")
folder_table1 = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", "", "")
folder_table2 = FileInfo("dbfs:/mnt/test_mount/table2/_SUCCESS", "_SUCCESS", "", "")
folder_table3 = FileInfo("dbfs:/mnt/test_mount/table2/1.snappy.parquet", "1.snappy.parquet", "", "")

def my_side_effect(path, **_):
if path == "/mnt/test_mount":
return [first_folder, second_folder]
if path == "dbfs:/mnt/test_mount/table1/":
return [folder_table1]
if path == "dbfs:/mnt/test_mount/table2/":
return [folder_table2, folder_table3]
return None

client.dbutils.fs.ls.side_effect = my_side_effect
backend = MockBackend(
rows={
'hive_metastore.test.tables': [],
'test.mounts': MOUNT_STORAGE[("/mnt/test_mount", "adls://bucket/")],
}
)
mounts = Mounts(backend, client, "test")
results = TablesInMounts(
backend, client, "test", mounts, include_paths_in_mount=["dbfs:/mnt/test_mount/table2/"]
).snapshot()
assert results == [
Table("hive_metastore", "mounted_test_mount", "table2", "EXTERNAL", "PARQUET", "adls://bucket/table2"),
]


def test_mount_listing_csv_json():
client = create_autospec(WorkspaceClient)

first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", "", "")
second_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/", "domain/", "", "")
second_folder_random_csv = FileInfo("dbfs:/mnt/test_mount/entity/domain/test.csv", "test.csv", "", "")
third_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/", "table1/", "", "")
first_json = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/some_jsons.json", "some_jsons.json", "", "")
second_json = FileInfo(
"dbfs:/mnt/test_mount/entity/domain/table1/some_other_jsons.json", "some_other_jsons.json", "", ""
)

def my_side_effect(path, **_):
if path == "/mnt/test_mount":
return [first_folder]
if path == "dbfs:/mnt/test_mount/entity/":
return [second_folder, second_folder_random_csv]
if path == "dbfs:/mnt/test_mount/entity/domain/":
return [third_folder]
if path == "dbfs:/mnt/test_mount/entity/domain/table1/":
return [first_json, second_json]
return None

client.dbutils.fs.ls.side_effect = my_side_effect
backend = MockBackend(
rows={
'hive_metastore.test.tables': [],
'test.mounts': MOUNT_STORAGE[("/mnt/test_mount", "adls://bucket/")],
}
)
mounts = Mounts(backend, client, "test")
results = TablesInMounts(backend, client, "test", mounts).snapshot()
assert results == [
Table(
"hive_metastore",
"mounted_test_mount",
"table1",
"EXTERNAL",
"JSON",
"adls://bucket/entity/domain/table1",
),
Table(
"hive_metastore",
"mounted_test_mount",
"entity",
"EXTERNAL",
"CSV",
"adls://bucket/entity",
),
]

0 comments on commit fc1747f

Please sign in to comment.