Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate non delta dbfs tables using Create Table As Select (CTAS). Convert such tables to Delta tables. #1434

Merged
merged 10 commits into from
Apr 18, 2024
24 changes: 20 additions & 4 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient


from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler, PrincipalACL
from databricks.labs.ucx.hive_metastore.mapping import (
Expand Down Expand Up @@ -141,6 +141,8 @@ def _migrate_table(
return True
if src_table.src.what == What.DBFS_ROOT_DELTA:
return self._migrate_dbfs_root_table(src_table.src, src_table.rule, grants)
if src_table.src.what == What.DBFS_ROOT_NON_DELTA:
return self._migrate_table_create_ctas(src_table.src, src_table.rule, grants)
if src_table.src.what == What.EXTERNAL_SYNC:
return self._migrate_external_table(src_table.src, src_table.rule, grants)
if src_table.src.what == What.EXTERNAL_NO_SYNC:
Expand Down Expand Up @@ -210,22 +212,29 @@ def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule, grants: list[Gr

def _migrate_non_sync_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
table_migrate_sql = self._get_create_in_place_sql(src_table, rule)
logger.debug(f"Migrating table (CTAS) {src_table.key} to using SQL query: {table_migrate_sql}")
logger.debug(f"Migrating table (No Sync) {src_table.key} to using SQL query: {table_migrate_sql}")
self._backend.execute(table_migrate_sql)
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
FastLee marked this conversation as resolved.
Show resolved Hide resolved
return self._migrate_acl(src_table, rule, grants)

def _migrate_table_create_ctas(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
table_migrate_sql = self._get_create_ctas_sql(src_table, rule)
logger.debug(f"Migrating table (Create Like) {src_table.key} to using SQL query: {table_migrate_sql}")
self._backend.execute(table_migrate_sql)
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
return self._migrate_acl(src_table, rule, grants)

def _get_create_in_place_sql(self, src_table: Table, rule: Rule) -> str:
create_sql = str(next(self._backend.fetch(src_table.sql_show_create())).get("createtab_stmt"))
create_sql = str(next(self._backend.fetch(src_table.sql_show_create()))["createtab_stmt"])
statements = sqlglot.parse(create_sql, read='databricks')
assert len(statements) == 1, 'Expected a single statement'
create = statements[0]
assert isinstance(create, expressions.Create), 'Expected a CREATE statement'
# safely replace current table name with the updated catalog
for table_name in create.find_all(expressions.Table):
if table_name.db == src_table.database and table_name.name == src_table.name:
# See https://github.com/tobymao/sqlglot/issues/3311
new_table_name = expressions.Table(
catalog=rule.catalog_name,
db=rule.dst_schema,
Expand All @@ -236,6 +245,13 @@ def _get_create_in_place_sql(self, src_table: Table, rule: Rule) -> str:
create.args['exists'] = True
return create.sql('databricks')

def _get_create_ctas_sql(self, src_table: Table, rule: Rule) -> str:
create_sql = (
f"CREATE TABLE IF NOT EXISTS {escape_sql_identifier(rule.as_uc_table_key)} "
f"AS SELECT * FROM {src_table.safe_sql_key}"
)
return create_sql

def _migrate_acl(self, src: Table, rule: Rule, grants: list[Grant] | None):
if grants is None:
return True
Expand Down
16 changes: 16 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ def is_format_supported_for_sync(self) -> bool:
return False
return self.table_format.upper() in {"DELTA", "PARQUET", "CSV", "JSON", "ORC", "TEXT", "AVRO"}

@property
def is_format_supported_for_create_like(self) -> bool:
# Based on documentation
# https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-table-like.html
if self.table_format is None:
return False
return self.table_format.upper() in {"DELTA", "PARQUET", "CSV", "JSON", "TEXT"}

@property
def is_databricks_dataset(self) -> bool:
if not self.location:
Expand Down Expand Up @@ -151,6 +159,14 @@ def what(self) -> What:
def sql_migrate_external(self, target_table_key):
return f"SYNC TABLE {escape_sql_identifier(target_table_key)} FROM {escape_sql_identifier(self.key)};"

def sql_migrate_create_like(self, target_table_key):
# Create table as a copy of the source table
# https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-table-like.html
return (
f"CREATE TABLE IF NOT EXISTS {escape_sql_identifier(target_table_key)} LIKE "
f"{escape_sql_identifier(self.key)};"
)

def sql_migrate_dbfs(self, target_table_key):
if not self.is_delta:
msg = f"{self.key} is not DELTA: {self.table_format}"
Expand Down
10 changes: 7 additions & 3 deletions src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,10 +992,14 @@ def create(
# temporary (if not view)
ddl = f"{ddl} AS {ctas}"
elif non_delta:
table_type = TableType.MANAGED # pylint: disable=redefined-variable-type
table_type = TableType.EXTERNAL # pylint: disable=redefined-variable-type
data_source_format = DataSourceFormat.JSON
storage_location = "dbfs:/databricks-datasets/iot-stream/data-device"
ddl = f"{ddl} USING json LOCATION '{storage_location}'"
storage_location = f"dbfs:/tmp/ucx_test_{make_random(4)}"
# Modified, otherwise it will identify the table as a DB Dataset
ddl = (
f"{ddl} USING json location '{storage_location}' as SELECT * FROM "
f"JSON.`dbfs:/databricks-datasets/iot-stream/data-device`"
)
elif external_csv is not None:
table_type = TableType.EXTERNAL
data_source_format = DataSourceFormat.CSV
Expand Down
28 changes: 28 additions & 0 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,34 @@ def test_migrate_managed_tables(ws, sql_backend, runtime_ctx, make_catalog):
assert target_table_properties[Table.UPGRADED_FROM_WS_PARAM] == str(ws.get_workspace_id())


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_migrate_dbfs_non_delta_tables(ws, sql_backend, runtime_ctx, make_catalog):
if not ws.config.is_azure:
pytest.skip("temporary: only works in azure test env")
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
src_managed_table = runtime_ctx.make_table(
catalog_name=src_schema.catalog_name, non_delta=True, schema_name=src_schema.name
)

dst_catalog = make_catalog()
dst_schema = runtime_ctx.make_schema(catalog_name=dst_catalog.name, name=src_schema.name)

logger.info(f"dst_catalog={dst_catalog.name}, managed_table={src_managed_table.full_name}")

rules = [Rule.from_src_dst(src_managed_table, dst_schema)]

runtime_ctx.with_table_mapping_rules(rules)
runtime_ctx.with_dummy_azure_resource_permission()
runtime_ctx.tables_migrator.migrate_tables(what=What.DBFS_ROOT_NON_DELTA)

target_tables = list(sql_backend.fetch(f"SHOW TABLES IN {dst_schema.full_name}"))
assert len(target_tables) == 1

target_table_properties = ws.tables.get(f"{dst_schema.full_name}.{src_managed_table.name}").properties
assert target_table_properties["upgraded_from"] == src_managed_table.full_name
assert target_table_properties[Table.UPGRADED_FROM_WS_PARAM] == str(ws.get_workspace_id())


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_migrate_tables_with_cache_should_not_create_table(
ws,
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/hive_metastore/tables/dbfs_parquet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"src": {
"catalog": "hive_metastore",
"database": "db1_src",
"name": "managed_dbfs",
"object_type": "MANAGED",
"table_format": "PARQUET",
"location": "dbfs:/some_location"
},
"rule": {
"workspace_name": "workspace",
"catalog_name": "ucx_default",
"src_schema": "db1_src",
"dst_schema": "db1_dst",
"src_table": "managed_dbfs",
"dst_table": "managed_dbfs"
}
}
46 changes: 46 additions & 0 deletions tests/unit/hive_metastore/test_table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,52 @@ def test_non_sync_tables_should_produce_proper_queries(ws):
) in backend.queries


def test_dbfs_non_delta_tables_should_produce_proper_queries(ws):
errors = {}
rows = {
"SHOW CREATE TABLE": [
{
"createtab_stmt": "CREATE EXTERNAL TABLE hive_metastore.db1_src.managed_dbfs "
"(foo STRING,bar STRING) USING PARQUET "
"LOCATION 's3://some_location/table'"
}
]
}
backend = MockBackend(fails_on_first=errors, rows=rows)
table_crawler = TablesCrawler(backend, "inventory_database")
udf_crawler = UdfsCrawler(backend, "inventory_database")
grant_crawler = GrantsCrawler(table_crawler, udf_crawler)
table_mapping = table_mapping_mock(["dbfs_parquet"])
group_manager = GroupManager(backend, ws, "inventory_database")
migration_status_refresher = MigrationStatusRefresher(ws, backend, "inventory_database", table_crawler)
principal_grants = create_autospec(PrincipalACL)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
ws,
backend,
table_mapping,
group_manager,
migration_status_refresher,
principal_grants,
)
table_migrate.migrate_tables(what=What.DBFS_ROOT_NON_DELTA)

assert (
"CREATE TABLE IF NOT EXISTS ucx_default.db1_dst.managed_dbfs "
"AS SELECT * FROM hive_metastore.db1_src.managed_dbfs" in backend.queries
)
assert (
"ALTER TABLE hive_metastore.db1_src.managed_dbfs "
"SET TBLPROPERTIES ('upgraded_to' = 'ucx_default.db1_dst.managed_dbfs');"
) in backend.queries
assert (
f"ALTER TABLE ucx_default.db1_dst.managed_dbfs "
f"SET TBLPROPERTIES ('upgraded_from' = 'hive_metastore.db1_src.managed_dbfs' , "
f"'{Table.UPGRADED_FROM_WS_PARAM}' = '12345');"
) in backend.queries


def test_migrate_dbfs_root_tables_should_be_skipped_when_upgrading_external(ws):
errors = {}
rows = {}
Expand Down
Loading