-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add table in mount migration #1225
Changes from 24 commits
b5675f2
ce9d24b
0c12eaa
a1f35ba
362c547
9e6099c
64f74af
297fde6
e0a2953
32878be
0e70761
bc792c0
74fff05
e420e5f
c2b3714
a5fa67a
bd9b6e0
bfe0983
4aec2b1
d11467e
627c9ed
93428d2
d685971
e7422c6
f955b69
7e9615b
4a233e9
595181f
9ba50a4
a888162
c2026e1
b15c558
d1e7503
9669208
0d931c3
c47ad0e
a139a83
115db64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -279,6 +279,7 @@ def __init__( | |
self._include_mounts = include_mounts | ||
self._ws = ws | ||
self._include_paths_in_mount = include_paths_in_mount | ||
self._seen_tables: dict[str, str] = {} | ||
william-conti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
irrelevant_patterns = {'_SUCCESS', '_committed_', '_started_'} | ||
if exclude_paths_in_mount: | ||
|
@@ -293,6 +294,7 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn) -> list[Result]: | |
cached_results = [] | ||
try: | ||
cached_results = list(fetcher()) | ||
self._init_seen_tables(cached_results) | ||
william-conti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except NotFound: | ||
pass | ||
logger.debug(f"[{self.full_name}] crawling new batch for {self._table}") | ||
|
@@ -302,6 +304,12 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn) -> list[Result]: | |
self._append_records(loaded_records) | ||
return loaded_records | ||
|
||
def _init_seen_tables(self, loaded_records: Iterable[Table]): | ||
for rec in loaded_records: | ||
if not rec.location: | ||
continue | ||
self._seen_tables[rec.location] = rec.key | ||
william-conti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def _append_records(self, items: Sequence[Table]): | ||
logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}") | ||
self._backend.save_table(self.full_name, items, Table, mode="overwrite") | ||
|
@@ -329,18 +337,41 @@ def _crawl(self): | |
|
||
for path, entry in table_paths.items(): | ||
guess_table = os.path.basename(path) | ||
table_location = self._get_table_location(mount, path) | ||
if table_location in self._seen_tables: | ||
logger.info( | ||
f"Path {table_location} is identified as a table in mount, but is present in current workspace as a registered table {self._seen_tables[table_location]}" | ||
) | ||
continue | ||
if path in self._seen_tables: | ||
logger.info( | ||
f"Path {table_location} is identified as a table in mount, but is present in current workspace as a registered table {self._seen_tables[path]}" | ||
william-conti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
continue | ||
table = Table( | ||
catalog="hive_metastore", | ||
database=f"{self.TABLE_IN_MOUNT_DB}{mount.name.replace('/mnt/', '').replace('/', '_')}", | ||
name=guess_table, | ||
object_type="EXTERNAL", | ||
table_format=entry.format, | ||
location=path.replace(f"dbfs:{mount.name}/", mount.source), | ||
location=table_location, | ||
is_partitioned=entry.is_partitioned, | ||
) | ||
all_tables.append(table) | ||
logger.info(f"Found a total of {len(all_tables)} tables in mount points") | ||
return all_tables | ||
|
||
def _get_table_location(self, mount: Mount, path: str): | ||
""" | ||
There can be different cases for mounts: | ||
- Mount(name='/mnt/things/a', source='abfss://things@labsazurethings.dfs.core.windows.net/a') | ||
- Mount(name='/mnt/mount' source='abfss://container@dsss.net/') | ||
Both must return the complete source with a forward slash in the end | ||
""" | ||
if mount.source.endswith("/"): | ||
return path.replace(f"dbfs:{mount.name}/", mount.source) | ||
return path.replace(f"dbfs:{mount.name}", mount.source) | ||
|
||
def _find_delta_log_folders(self, root_dir: str, delta_log_folders=None) -> dict: | ||
if delta_log_folders is None: | ||
delta_log_folders = {} | ||
|
@@ -371,7 +402,8 @@ def _find_delta_log_folders(self, root_dir: str, delta_log_folders=None) -> dict | |
def _assess_path( | ||
self, file_info: FileInfo, delta_log_folders: dict[str, Table], root_path: str | ||
) -> TableInMount | None: | ||
if file_info.name == "_delta_log/": | ||
# Depends of execution runtime, with SDK, dbutils.fs.list returns _delta_log, a cluster will return _delta_log/ | ||
if file_info.name in {"_delta_log/", "_delta_log"}: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for this line, but for lines below: if self._is_partitioned(file_info.name):
logger.debug(f"Found partitioned parquet {file_info.path}")
return TableInMount(format="PARQUET", is_partitioned=True) Isn't JSON and CSV tables could be partitioned too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be, but I'll like to remain those out of scope for the moment and handle those in a separate PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But the issue for this PR seems to be: if it sees There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, in this case it's a bug and the TablesInMounts logic has to be pretty much rewritten to handle those small cases. Currently the logic handles partitioned Delta and Parquet, and by default partitioned CSVs and JSONs would be identified as partitioned. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
logger.debug(f"Found delta table {root_path}") | ||
if not delta_log_folders.get(root_path): | ||
return TableInMount(format="DELTA", is_partitioned=False) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,13 +27,25 @@ def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext): | |
ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_DELTA, acl_strategy=[AclMigrationWhat.LEGACY_TACL]) | ||
|
||
|
||
class MigrateTablesInMounts(Workflow): | ||
class ScanTablesInMounts(Workflow): | ||
def __init__(self): | ||
super().__init__('migrate-tables-in-mounts-experimental') | ||
super().__init__('scan-tables-in-mounts-experimental') | ||
|
||
@job_task | ||
def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): | ||
"""[EXPERIMENTAL] This workflow scans for Delta tables inside all mount points | ||
captured during the assessment. It will store the results under the `tables` table | ||
located under the assessment.""" | ||
ctx.tables_in_mounts.snapshot() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please integration test TablesInMounts as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's actually very hard to integration test TablesInMounts because
This code is made to handle the behaviour of a Spark cluster as much as possible. Additionally, some utility methods are missing in SDK, like isDir() @nfx FYI |
||
|
||
|
||
class MigrateTablesInMounts(Workflow): | ||
def __init__(self): | ||
super().__init__('migrate-tables-in-mounts-experimental') | ||
|
||
@job_task(job_cluster="table_migration", depends_on=[ScanTablesInMounts.scan_tables_in_mounts_experimental]) | ||
def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): | ||
william-conti marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement.""" | ||
ctx.tables_migrator.migrate_tables( | ||
what=What.TABLE_IN_MOUNT, acl_strategy=[AclMigrationWhat.DEFAULT_TABLE_OWNER] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
migrate_tables_in_mounts_experimental
also depends oncreate-table-mapping
command andcreate-table-mapping
needs thescan_tables_in_mounts_experimental
so it can map the tables in mounts. The correct order is:assessment
scan_tables_in_mounts_experimental
create-table-mapping
cli commandmigrate_tables_in_mounts_experimental
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After testing it somehow needs to have azure_storage_account_info.csv to work ...