Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added column is_partitioned to ucx.tables and dashboard #959

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6c44c29
Added colum `is_partitioned` inside ucx.tables
dleiva04 Feb 19, 2024
1ce46b2
Added comment inside tables.scala for explanation
dleiva04 Feb 19, 2024
e00434f
Integration test WIP
dleiva04 Mar 4, 2024
7504924
remove comments
dleiva04 Mar 4, 2024
f6b9ad9
Merge branch 'databrickslabs:main' into is_partitioned
dleiva04 Mar 4, 2024
a1b20b8
updates to comply with fmt and lint
dleiva04 Mar 5, 2024
26f7e4f
Added colum `is_partitioned` inside ucx.tables
dleiva04 Feb 19, 2024
38d4194
Added comment inside tables.scala for explanation
dleiva04 Feb 19, 2024
34a9544
Integration test WIP
dleiva04 Mar 4, 2024
9b6e0c7
remove comments
dleiva04 Mar 4, 2024
4df3a83
updates to comply with fmt and lint
dleiva04 Mar 5, 2024
c197d98
adding upgrade script
dleiva04 Mar 6, 2024
83b515c
Merge branch 'is_partitioned' of github.com:dleiva04/ucx into is_part…
dleiva04 Mar 6, 2024
50d44c7
Merge branch 'main' into is_partitioned
dleiva04 Mar 6, 2024
7c9b4dc
Merge branch 'main' into is_partitioned
dleiva04 Mar 6, 2024
0cf8263
Added colum `is_partitioned` inside ucx.tables
dleiva04 Feb 19, 2024
94c71da
Added comment inside tables.scala for explanation
dleiva04 Feb 19, 2024
b90e037
Integration test WIP
dleiva04 Mar 4, 2024
8aa7966
remove comments
dleiva04 Mar 4, 2024
36bfa18
updates to comply with fmt and lint
dleiva04 Mar 5, 2024
48d082e
adding upgrade script
dleiva04 Mar 6, 2024
4f2b0d5
Added comment inside tables.scala for explanation
dleiva04 Feb 19, 2024
f44d17f
Integration test WIP
dleiva04 Mar 4, 2024
0120412
updates to comply with fmt and lint
dleiva04 Mar 5, 2024
323ac98
Update
dleiva04 Mar 6, 2024
2e987d0
change lambda variable name
dleiva04 Mar 6, 2024
e7fc88c
Added comment inside tables.scala for explanation
dleiva04 Feb 19, 2024
b22311a
Integration test WIP
dleiva04 Mar 4, 2024
809d3d5
updates to comply with fmt and lint
dleiva04 Mar 5, 2024
e2093ff
update
dleiva04 Mar 6, 2024
732c119
update
dleiva04 Mar 6, 2024
c15114c
update
dleiva04 Mar 6, 2024
3bfb7f6
update
dleiva04 Mar 7, 2024
7f4e5d8
update
dleiva04 Mar 7, 2024
0832d51
Merge branch 'main' into is_partitioned
dleiva04 Mar 7, 2024
c922a87
updates to comply with fmt and lint
dleiva04 Mar 5, 2024
33bc922
adding upgrade script
dleiva04 Mar 6, 2024
d74df76
update
dleiva04 Mar 6, 2024
f850692
update
dleiva04 Mar 7, 2024
8955c55
updates to comply with fmt and lint
dleiva04 Mar 5, 2024
fd529a1
Merge branch 'main' into is_partitioned
dleiva04 Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Table:
upgraded_to: str | None = None

storage_properties: str | None = None
is_partitioned: bool = False

DBFS_ROOT_PREFIXES: typing.ClassVar[list[str]] = [
"/dbfs/",
Expand Down Expand Up @@ -258,6 +259,7 @@ def _describe(self, catalog: str, database: str, table: str) -> Table | None:
"upgraded_to", None
),
storage_properties=self._parse_table_props(describe.get("Storage Properties", "").lower()), # type: ignore[arg-type]
is_partitioned="#Partition Information" in describe,
)
except Exception as e: # pylint: disable=broad-exception-caught
# TODO: https://github.com/databrickslabs/ucx/issues/406
Expand Down
12 changes: 10 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/tables.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col,lower,upper}
import org.apache.spark.sql.catalyst.TableIdentifier

// must follow the same structure as databricks.labs.ucx.hive_metastore.tables.Table
case class TableDetails(catalog: String, database: String, name: String, object_type: String,
table_format: String, location: String, view_text: String, upgraded_to: String, storage_properties: String)
table_format: String, location: String, view_text: String, upgraded_to: String, storage_properties: String, is_partitioned: Boolean)

// recording error log in the database
case class TableError(catalog: String, database: String, name: String, error: String)
Expand Down Expand Up @@ -57,9 +58,16 @@ def metadataForAllTables(databases: Seq[String], queue: ConcurrentLinkedQueue[Ta
s"$key=$value"
}.mkString("[", ", ", "]")

val partitionColumnNames = try {
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName, Some(databaseName))).partitionColumnNames
} catch {
case e: Exception => null
}
val isPartitioned = if (partitionColumnNames != null && !partitionColumnNames.isEmpty) true else false

Some(TableDetails("hive_metastore", databaseName, tableName, table.tableType.name, table.provider.orNull,
table.storage.locationUri.map(_.toString).orNull, table.viewText.orNull,
upgraded_to match { case Some(target) => target case None => null }, formattedString))
upgraded_to match { case Some(target) => target case None => null }, formattedString, isPartitioned))
}
} catch {
case err: Throwable =>
Expand Down
55 changes: 20 additions & 35 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
AlreadyExists,
BadRequest,
Cancelled,
DatabricksError,
DataLoss,
DeadlineExceeded,
InternalError,
Expand Down Expand Up @@ -56,7 +57,7 @@
from databricks.labs.ucx.assessment.azure import AzureServicePrincipalInfo
from databricks.labs.ucx.assessment.clusters import ClusterInfo
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptInfo
from databricks.labs.ucx.assessment.jobs import JobInfo, SubmitRunInfo
from databricks.labs.ucx.assessment.jobs import JobInfo
from databricks.labs.ucx.assessment.pipelines import PipelineInfo
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.configure import ConfigureClusterOverrides
Expand Down Expand Up @@ -162,7 +163,6 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "table_failures", TableError),
functools.partial(table, "workspace_objects", WorkspaceObjectInfo),
functools.partial(table, "permissions", Permissions),
functools.partial(table, "submit_runs", SubmitRunInfo),
],
)
deployer.deploy_view("objects", "queries/views/objects.sql")
Expand Down Expand Up @@ -476,20 +476,28 @@ def run_workflow(self, step: str):
except OperationFailed as err:
# currently we don't have any good message from API, so we have to work around it.
job_run = self._ws.jobs.get_run(job_run_waiter.run_id)
raise self._infer_error_from_job_run(job_run) from err
raise self._infer_nested_error(job_run) from err
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Properly fetch main branch and rebase. This file is irrelevant to this PR.


def _infer_error_from_job_run(self, job_run) -> Exception:
errors: list[Exception] = []
def _infer_nested_error(self, job_run) -> Exception:
errors: list[DatabricksError] = []
timeouts: list[DeadlineExceeded] = []
assert job_run.tasks is not None
for run_task in job_run.tasks:
error = self._infer_error_from_task_run(run_task)
if not error:
if not run_task.state:
continue
if isinstance(error, DeadlineExceeded):
timeouts.append(error)
if run_task.state.result_state == jobs.RunResultState.TIMEDOUT:
msg = f"{run_task.task_key}: The run was stopped after reaching the timeout"
timeouts.append(DeadlineExceeded(msg))
continue
errors.append(error)
if run_task.state.result_state != jobs.RunResultState.FAILED:
continue
assert run_task.run_id is not None
run_output = self._ws.jobs.get_run_output(run_task.run_id)
if logger.isEnabledFor(logging.DEBUG):
if run_output and run_output.error_trace:
sys.stderr.write(run_output.error_trace)
if run_output and run_output.error:
errors.append(self._infer_task_exception(f"{run_task.task_key}: {run_output.error}"))
assert job_run.state is not None
assert job_run.state.state_message is not None
if len(errors) == 1:
Expand All @@ -499,29 +507,8 @@ def _infer_error_from_job_run(self, job_run) -> Exception:
return Unknown(job_run.state.state_message)
return ManyError(all_errors)

def _infer_error_from_task_run(self, run_task: jobs.RunTask) -> Exception | None:
if not run_task.state:
return None
if run_task.state.result_state == jobs.RunResultState.TIMEDOUT:
msg = f"{run_task.task_key}: The run was stopped after reaching the timeout"
return DeadlineExceeded(msg)
if run_task.state.result_state != jobs.RunResultState.FAILED:
return None
assert run_task.run_id is not None
run_output = self._ws.jobs.get_run_output(run_task.run_id)
if not run_output:
msg = f'No run output. {run_task.state.state_message}'
return InternalError(msg)
if logger.isEnabledFor(logging.DEBUG):
if run_output.error_trace:
sys.stderr.write(run_output.error_trace)
if not run_output.error:
msg = f'No error in run output. {run_task.state.state_message}'
return InternalError(msg)
return self._infer_task_exception(f"{run_task.task_key}: {run_output.error}")

@staticmethod
def _infer_task_exception(haystack: str) -> Exception:
def _infer_task_exception(haystack: str) -> DatabricksError:
needles = [
BadRequest,
Unauthenticated,
Expand All @@ -543,10 +530,8 @@ def _infer_task_exception(haystack: str) -> Exception:
RequestLimitExceeded,
Unknown,
DataLoss,
ValueError,
KeyError,
]
constructors: dict[re.Pattern, type[Exception]] = {
constructors: dict[re.Pattern, type[DatabricksError]] = {
re.compile(r".*\[TABLE_OR_VIEW_NOT_FOUND] (.*)"): NotFound,
re.compile(r".*\[SCHEMA_NOT_FOUND] (.*)"): NotFound,
}
Expand Down
12 changes: 10 additions & 2 deletions src/databricks/labs/ucx/mixins/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
import time
from ast import literal_eval
from collections.abc import Iterator
from datetime import timedelta
from typing import Any
Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(self, ws: WorkspaceClient):
self.type_converters = {
ColumnInfoTypeName.ARRAY: json.loads,
# ColumnInfoTypeName.BINARY: not_supported(ColumnInfoTypeName.BINARY),
ColumnInfoTypeName.BOOLEAN: bool,
ColumnInfoTypeName.BOOLEAN: lambda value: value.lower() == "true",
# ColumnInfoTypeName.BYTE: not_supported(ColumnInfoTypeName.BYTE),
ColumnInfoTypeName.CHAR: str,
# ColumnInfoTypeName.DATE: not_supported(ColumnInfoTypeName.DATE),
Expand Down Expand Up @@ -264,10 +265,17 @@ def _row_converters(self, execute_response):
type_name = col.type_name
if not type_name:
type_name = ColumnInfoTypeName.NULL
conv = self.type_converters.get(type_name, None)
if type_name == ColumnInfoTypeName.BOOLEAN:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is irrelevant

conv = self._convert_boolean_type
else:
conv = self.type_converters.get(type_name, None)
if conv is None:
msg = f"{col.name} has no {type_name.value} converter"
raise ValueError(msg)
col_conv.append(conv)
row_factory = type("Row", (Row,), {"__columns__": col_names})
return col_conv, row_factory

@staticmethod
def _convert_boolean_type(value):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Irrelevant

return literal_eval(value.capitalize())
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ SELECT CONCAT(tables.`database`, '.', tables.name) AS name,
WHEN size_in_bytes < 100000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024,2) AS string),"MB")
WHEN size_in_bytes < 100000000000 THEN CONCAT(CAST(round(size_in_bytes/1024/1024/1024,2) AS string),"GB")
ELSE CONCAT(CAST(round(size_in_bytes/1024/1024/1024/1024,2) AS string),"TB")
END AS table_size
END AS table_size,
IF(is_partitioned is true, "Yes", "No") AS is_partitioned

FROM $inventory.tables left outer join $inventory.table_size on
$inventory.tables.catalog = $inventory.table_size.catalog and
$inventory.tables.database = $inventory.table_size.database and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from databricks.labs.blueprint.installation import Installation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change name to v0.16.0

from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend


def upgrade(installation: Installation, ws: WorkspaceClient):
config = installation.load(WorkspaceConfig)
sql_backend = StatementExecutionBackend(ws, config.warehouse_id)
sql_backend.execute(f"ALTER TABLE {config.inventory_database}.tables ADD COLUMN is_partitioned BOOLEAN")
installation.save(config)
25 changes: 25 additions & 0 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from databricks.sdk.service.iam import PermissionLevel

from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.install import (
PRODUCT_INFO,
WorkspaceInstallation,
Expand Down Expand Up @@ -322,3 +323,27 @@ def test_uninstallation(ws, sql_backend, new_installation):
ws.jobs.get(job_id=assessment_job_id)
with pytest.raises(NotFound):
sql_backend.execute(f"show tables from hive_metastore.{install.config.inventory_database}")


@retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5))
def test_partitioned_tables(ws, sql_backend, new_installation, inventory_schema, make_schema, make_table):
install = new_installation()

schema = make_schema(catalog_name="hive_metastore")
sql_backend.execute(
f"CREATE TABLE IF NOT EXISTS {schema.full_name}.partitioned_table (column1 string, column2 STRING) PARTITIONED BY (column1)"
)
sql_backend.execute(
f"CREATE TABLE IF NOT EXISTS {schema.full_name}.non_partitioned_table (column1 string, column2 STRING)"
)
install.run_workflow("assessment")

tables = TablesCrawler(sql_backend, inventory_schema)

all_tables = {}
for table in tables.snapshot():
all_tables[table.key] = table

assert len(all_tables) >= 2
assert all_tables[f"{schema.full_name}.partitioned_table"].is_partitioned is True
assert all_tables[f"{schema.full_name}.non_partitioned_table"].is_partitioned is False