diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index 27a21e50fe..4177446187 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -38,6 +38,7 @@ class Table: upgraded_to: str | None = None storage_properties: str | None = None + is_partitioned: bool = False DBFS_ROOT_PREFIXES: typing.ClassVar[list[str]] = [ "/dbfs/", @@ -258,6 +259,7 @@ def _describe(self, catalog: str, database: str, table: str) -> Table | None: "upgraded_to", None ), storage_properties=self._parse_table_props(describe.get("Storage Properties", "").lower()), # type: ignore[arg-type] + is_partitioned="#Partition Information" in describe, ) except Exception as e: # pylint: disable=broad-exception-caught # TODO: https://github.com/databrickslabs/ucx/issues/406 diff --git a/src/databricks/labs/ucx/hive_metastore/tables.scala b/src/databricks/labs/ucx/hive_metastore/tables.scala index 67b3784f86..025535a83b 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.scala +++ b/src/databricks/labs/ucx/hive_metastore/tables.scala @@ -9,10 +9,11 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{col,lower,upper} +import org.apache.spark.sql.catalyst.TableIdentifier // must follow the same structure as databricks.labs.ucx.hive_metastore.tables.Table case class TableDetails(catalog: String, database: String, name: String, object_type: String, - table_format: String, location: String, view_text: String, upgraded_to: String, storage_properties: String) + table_format: String, location: String, view_text: String, upgraded_to: String, storage_properties: String, is_partitioned: Boolean) // recording error log in the database case class TableError(catalog: String, database: String, name: String, error: String) @@ -57,9 +58,16 @@ def metadataForAllTables(databases: Seq[String], queue: ConcurrentLinkedQueue[Ta s"$key=$value" }.mkString("[", ", ", "]") + val partitionColumnNames = try { + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName, Some(databaseName))).partitionColumnNames + } catch { + case e: Exception => null + } + val isPartitioned = if (partitionColumnNames != null && !partitionColumnNames.isEmpty) true else false + Some(TableDetails("hive_metastore", databaseName, tableName, table.tableType.name, table.provider.orNull, table.storage.locationUri.map(_.toString).orNull, table.viewText.orNull, - upgraded_to match { case Some(target) => target case None => null }, formattedString)) + upgraded_to match { case Some(target) => target case None => null }, formattedString, isPartitioned)) } } catch { case err: Throwable => diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index d6a84e1dcb..376f827c24 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -25,6 +25,7 @@ AlreadyExists, BadRequest, Cancelled, + DatabricksError, DataLoss, DeadlineExceeded, InternalError, @@ -56,7 +57,7 @@ from databricks.labs.ucx.assessment.azure import AzureServicePrincipalInfo from databricks.labs.ucx.assessment.clusters import ClusterInfo from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptInfo -from databricks.labs.ucx.assessment.jobs import JobInfo, SubmitRunInfo +from databricks.labs.ucx.assessment.jobs import JobInfo from databricks.labs.ucx.assessment.pipelines import PipelineInfo from databricks.labs.ucx.config import WorkspaceConfig from databricks.labs.ucx.configure import ConfigureClusterOverrides @@ -162,7 +163,6 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): functools.partial(table, "table_failures", TableError), functools.partial(table, "workspace_objects", WorkspaceObjectInfo), functools.partial(table, "permissions", Permissions), - functools.partial(table, "submit_runs", SubmitRunInfo), ], ) deployer.deploy_view("objects", "queries/views/objects.sql") @@ -476,20 +476,28 @@ def run_workflow(self, step: str): except OperationFailed as err: # currently we don't have any good message from API, so we have to work around it. job_run = self._ws.jobs.get_run(job_run_waiter.run_id) - raise self._infer_error_from_job_run(job_run) from err + raise self._infer_nested_error(job_run) from err - def _infer_error_from_job_run(self, job_run) -> Exception: - errors: list[Exception] = [] + def _infer_nested_error(self, job_run) -> Exception: + errors: list[DatabricksError] = [] timeouts: list[DeadlineExceeded] = [] assert job_run.tasks is not None for run_task in job_run.tasks: - error = self._infer_error_from_task_run(run_task) - if not error: + if not run_task.state: continue - if isinstance(error, DeadlineExceeded): - timeouts.append(error) + if run_task.state.result_state == jobs.RunResultState.TIMEDOUT: + msg = f"{run_task.task_key}: The run was stopped after reaching the timeout" + timeouts.append(DeadlineExceeded(msg)) continue - errors.append(error) + if run_task.state.result_state != jobs.RunResultState.FAILED: + continue + assert run_task.run_id is not None + run_output = self._ws.jobs.get_run_output(run_task.run_id) + if logger.isEnabledFor(logging.DEBUG): + if run_output and run_output.error_trace: + sys.stderr.write(run_output.error_trace) + if run_output and run_output.error: + errors.append(self._infer_task_exception(f"{run_task.task_key}: {run_output.error}")) assert job_run.state is not None assert job_run.state.state_message is not None if len(errors) == 1: @@ -499,29 +507,8 @@ def _infer_error_from_job_run(self, job_run) -> Exception: return Unknown(job_run.state.state_message) return ManyError(all_errors) - def _infer_error_from_task_run(self, run_task: jobs.RunTask) -> Exception | None: - if not run_task.state: - return None - if run_task.state.result_state == jobs.RunResultState.TIMEDOUT: - msg = f"{run_task.task_key}: The run was stopped after reaching the timeout" - return DeadlineExceeded(msg) - if run_task.state.result_state != jobs.RunResultState.FAILED: - return None - assert run_task.run_id is not None - run_output = self._ws.jobs.get_run_output(run_task.run_id) - if not run_output: - msg = f'No run output. {run_task.state.state_message}' - return InternalError(msg) - if logger.isEnabledFor(logging.DEBUG): - if run_output.error_trace: - sys.stderr.write(run_output.error_trace) - if not run_output.error: - msg = f'No error in run output. {run_task.state.state_message}' - return InternalError(msg) - return self._infer_task_exception(f"{run_task.task_key}: {run_output.error}") - @staticmethod - def _infer_task_exception(haystack: str) -> Exception: + def _infer_task_exception(haystack: str) -> DatabricksError: needles = [ BadRequest, Unauthenticated, @@ -543,10 +530,8 @@ def _infer_task_exception(haystack: str) -> Exception: RequestLimitExceeded, Unknown, DataLoss, - ValueError, - KeyError, ] - constructors: dict[re.Pattern, type[Exception]] = { + constructors: dict[re.Pattern, type[DatabricksError]] = { re.compile(r".*\[TABLE_OR_VIEW_NOT_FOUND] (.*)"): NotFound, re.compile(r".*\[SCHEMA_NOT_FOUND] (.*)"): NotFound, } diff --git a/src/databricks/labs/ucx/mixins/sql.py b/src/databricks/labs/ucx/mixins/sql.py index b8079f07a9..f1f6bf5589 100644 --- a/src/databricks/labs/ucx/mixins/sql.py +++ b/src/databricks/labs/ucx/mixins/sql.py @@ -3,6 +3,7 @@ import logging import random import time +from ast import literal_eval from collections.abc import Iterator from datetime import timedelta from typing import Any @@ -77,7 +78,7 @@ def __init__(self, ws: WorkspaceClient): self.type_converters = { ColumnInfoTypeName.ARRAY: json.loads, # ColumnInfoTypeName.BINARY: not_supported(ColumnInfoTypeName.BINARY), - ColumnInfoTypeName.BOOLEAN: bool, + ColumnInfoTypeName.BOOLEAN: lambda value: value.lower() == "true", # ColumnInfoTypeName.BYTE: not_supported(ColumnInfoTypeName.BYTE), ColumnInfoTypeName.CHAR: str, # ColumnInfoTypeName.DATE: not_supported(ColumnInfoTypeName.DATE), @@ -264,10 +265,17 @@ def _row_converters(self, execute_response): type_name = col.type_name if not type_name: type_name = ColumnInfoTypeName.NULL - conv = self.type_converters.get(type_name, None) + if type_name == ColumnInfoTypeName.BOOLEAN: + conv = self._convert_boolean_type + else: + conv = self.type_converters.get(type_name, None) if conv is None: msg = f"{col.name} has no {type_name.value} converter" raise ValueError(msg) col_conv.append(conv) row_factory = type("Row", (Row,), {"__columns__": col_names}) return col_conv, row_factory + + @staticmethod + def _convert_boolean_type(value): + return literal_eval(value.capitalize()) diff --git a/src/databricks/labs/ucx/queries/assessment/main/05_0_all_tables.sql b/src/databricks/labs/ucx/queries/assessment/main/05_0_all_tables.sql index 2aed4ef42a..4142cf7325 100644 --- a/src/databricks/labs/ucx/queries/assessment/main/05_0_all_tables.sql +++ b/src/databricks/labs/ucx/queries/assessment/main/05_0_all_tables.sql @@ -24,7 +24,9 @@ SELECT CONCAT(tables.`database`, '.', tables.name) AS name, WHEN size_in_bytes < 100000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024,2) AS string),"MB") WHEN size_in_bytes < 100000000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024/1024,2) AS string),"GB") ELSE CONCAT(CAST(round(size_in_bytes/1024/1024/1024/1024,2) AS string),"TB") - END AS table_size + END AS table_size, + IF(is_partitioned is true, "Yes", "No") AS is_partitioned + FROM $inventory.tables left outer join $inventory.table_size on $inventory.tables.catalog = $inventory.table_size.catalog and $inventory.tables.database = $inventory.table_size.database and diff --git a/src/databricks/labs/ucx/upgrades/v0.15.0_add_is_partitioned_column.py b/src/databricks/labs/ucx/upgrades/v0.15.0_add_is_partitioned_column.py new file mode 100644 index 0000000000..429cf7aaf8 --- /dev/null +++ b/src/databricks/labs/ucx/upgrades/v0.15.0_add_is_partitioned_column.py @@ -0,0 +1,12 @@ +from databricks.labs.blueprint.installation import Installation +from databricks.sdk import WorkspaceClient + +from databricks.labs.ucx.config import WorkspaceConfig +from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend + + +def upgrade(installation: Installation, ws: WorkspaceClient): + config = installation.load(WorkspaceConfig) + sql_backend = StatementExecutionBackend(ws, config.warehouse_id) + sql_backend.execute(f"ALTER TABLE {config.inventory_database}.tables ADD COLUMN is_partitioned BOOLEAN") + installation.save(config) diff --git a/tests/integration/test_installation.py b/tests/integration/test_installation.py index 463829b761..d0e87612ec 100644 --- a/tests/integration/test_installation.py +++ b/tests/integration/test_installation.py @@ -18,6 +18,7 @@ from databricks.sdk.service.iam import PermissionLevel from databricks.labs.ucx.config import WorkspaceConfig +from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.install import ( PRODUCT_INFO, WorkspaceInstallation, @@ -322,3 +323,27 @@ def test_uninstallation(ws, sql_backend, new_installation): ws.jobs.get(job_id=assessment_job_id) with pytest.raises(NotFound): sql_backend.execute(f"show tables from hive_metastore.{install.config.inventory_database}") + + +@retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5)) +def test_partitioned_tables(ws, sql_backend, new_installation, inventory_schema, make_schema, make_table): + install = new_installation() + + schema = make_schema(catalog_name="hive_metastore") + sql_backend.execute( + f"CREATE TABLE IF NOT EXISTS {schema.full_name}.partitioned_table (column1 string, column2 STRING) PARTITIONED BY (column1)" + ) + sql_backend.execute( + f"CREATE TABLE IF NOT EXISTS {schema.full_name}.non_partitioned_table (column1 string, column2 STRING)" + ) + install.run_workflow("assessment") + + tables = TablesCrawler(sql_backend, inventory_schema) + + all_tables = {} + for table in tables.snapshot(): + all_tables[table.key] = table + + assert len(all_tables) >= 2 + assert all_tables[f"{schema.full_name}.partitioned_table"].is_partitioned is True + assert all_tables[f"{schema.full_name}.non_partitioned_table"].is_partitioned is False