Skip to content

Commit

Permalink
Fix query linter integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
JCZuurmond committed Dec 6, 2024
1 parent 7e3e68f commit 5f83f7f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 57 deletions.
13 changes: 10 additions & 3 deletions src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,17 @@ def from_dict(cls, data: dict[str, Any]) -> Self:
UNKNOWN = "unknown"

source_id: str = UNKNOWN
source_timestamp: datetime = datetime.fromtimestamp(0) # Note: attribute is not used, kept for legacy reasons

source_timestamp: datetime = field(default_factory=lambda: datetime.fromtimestamp(0), compare=False)
"""Unused attribute, kept for legacy reasons"""

source_lineage: list[LineageAtom] = field(default_factory=list)
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)

assessment_start_timestamp: datetime = field(default_factory=lambda: datetime.fromtimestamp(0), compare=False)
"""Unused attribute, kept for legacy reasons"""

assessment_end_timestamp: datetime = field(default_factory=lambda: datetime.fromtimestamp(0), compare=False)
"""Unused attribute, kept for legacy reasons"""

def replace_source(
self,
Expand Down
128 changes: 74 additions & 54 deletions tests/integration/source_code/test_queries.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,77 @@
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawler
from databricks.labs.ucx.source_code.queries import QueryLinter
from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler
from databricks.labs.lsql.backends import Row

from databricks.labs.ucx.source_code.base import DirectFsAccess, LineageAtom, UsedTable

def test_query_linter_lints_queries_and_stores_dfsas_and_tables(simple_ctx, sql_backend, make_query, make_dashboard):
queries = [make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`")]
dashboards = [make_dashboard(query=queries[0])]
queries.append(make_query(sql_query="SELECT * from some_schema.some_table"))
dashboards.append(make_dashboard(query=queries[1]))
linter = QueryLinter(
sql_backend,
simple_ctx.inventory_database,
TableMigrationIndex([]),
simple_ctx.directfs_access_crawler_for_queries,
simple_ctx.used_tables_crawler_for_queries,
[],

def test_query_linter_lints_queries_and_stores_dfsas_and_tables(simple_ctx) -> None:
query_with_dfsa = simple_ctx.make_query(sql_query="SELECT * from csv.`dbfs://some_folder/some_file.csv`")
dashboard_with_dfsa = simple_ctx.make_dashboard(query=query_with_dfsa)
# Lakeview dashboard expects a string, not a legacy query
dashboard_with_used_table = simple_ctx.make_lakeview_dashboard(query="SELECT * FROM some_schema.some_table")

simple_ctx.query_linter.refresh_report()

problems = list(simple_ctx.sql_backend.fetch("SELECT * FROM query_problems", schema=simple_ctx.inventory_database))
assert problems == [
Row(
dashboard_id=dashboard_with_dfsa.id,
dashboard_parent=dashboard_with_dfsa.parent,
dashboard_name=dashboard_with_dfsa.name,
query_id=query_with_dfsa.id,
query_parent=query_with_dfsa.parent,
query_name=query_with_dfsa.name,
code='direct-filesystem-access-in-sql-query',
message='The use of direct filesystem references is deprecated: dbfs://some_folder/some_file.csv',
)
]

dfsas = list(simple_ctx.directfs_access_crawler_for_queries.snapshot())
# By comparing the element instead of the list the `field(compare=False)` of the dataclass attributes take effect
assert len(dfsas) == 1, "Expected one DFSA"
assert dfsas[0] == DirectFsAccess(
source_id=f"{dashboard_with_dfsa.id}/{query_with_dfsa.id}",
source_lineage=[
LineageAtom(
object_type="DASHBOARD",
object_id=dashboard_with_dfsa.id,
other={"parent": dashboard_with_dfsa.parent, "name": dashboard_with_dfsa.name},
),
LineageAtom(
object_type="QUERY",
object_id=f"{dashboard_with_dfsa.id}/{query_with_dfsa.id}",
other={"name": query_with_dfsa.name},
),
],
path="dbfs://some_folder/some_file.csv",
is_read=True,
is_write=False,
)

used_tables = list(simple_ctx.used_tables_crawler_for_queries.snapshot())
# By comparing the element instead of the list the `field(compare=False)` of the dataclass attributes take effect
assert len(used_tables) == 1, "Expected one used table"
# The "query" in the source and object id, and "count" in the name are hardcoded in the
# `make_lakeview_dashboard` fixture
assert used_tables[0] == UsedTable(
source_id=f"{dashboard_with_used_table.dashboard_id}/query",
source_lineage=[
LineageAtom(
object_type="DASHBOARD",
object_id=dashboard_with_used_table.dashboard_id,
other={
"parent": dashboard_with_used_table.parent_path,
"name": dashboard_with_used_table.display_name,
},
),
LineageAtom(
object_type="QUERY",
object_id=f"{dashboard_with_used_table.dashboard_id}/query",
other={"name": "count"},
),
],
catalog_name="hive_metastore",
schema_name="some_schema",
table_name="some_table",
is_read=True,
is_write=False,
)
linter.refresh_report()
all_problems = sql_backend.fetch("SELECT * FROM query_problems", schema=simple_ctx.inventory_database)
problems = [row for row in all_problems if row["query_name"] == queries[0].name]
assert len(problems) == 1
dfsa_crawler = DirectFsAccessCrawler.for_queries(sql_backend, simple_ctx.inventory_database)
all_dfsas = dfsa_crawler.snapshot()
source_id = f"{dashboards[0].id}/{queries[0].id}"
dfsas = [dfsa for dfsa in all_dfsas if dfsa.source_id == source_id]
assert len(dfsas) == 1
assert len(dfsas[0].source_lineage) == 2
lineage = dfsas[0].source_lineage[0]
assert lineage.object_type == "DASHBOARD"
assert lineage.object_id == dashboards[0].id
assert lineage.other
assert lineage.other.get("parent", None) == dashboards[0].parent
assert lineage.other.get("name", None) == dashboards[0].name
lineage = dfsas[0].source_lineage[1]
assert lineage.object_type == "QUERY"
assert lineage.object_id == source_id
assert lineage.other
assert lineage.other.get("name", None) == queries[0].name
used_tables_crawler = UsedTablesCrawler.for_queries(sql_backend, simple_ctx.inventory_database)
all_tables = used_tables_crawler.snapshot()
source_id = f"{dashboards[1].id}/{queries[1].id}"
tables = [table for table in all_tables if table.source_id == source_id]
assert len(tables) == 1
assert len(tables[0].source_lineage) == 2
lineage = tables[0].source_lineage[0]
assert lineage.object_type == "DASHBOARD"
assert lineage.object_id == dashboards[1].id
assert lineage.other
assert lineage.other.get("parent", None) == dashboards[1].parent
assert lineage.other.get("name", None) == dashboards[1].name
lineage = tables[0].source_lineage[1]
assert lineage.object_type == "QUERY"
assert lineage.object_id == source_id
assert lineage.other
assert lineage.other.get("name", None) == queries[1].name

0 comments on commit 5f83f7f

Please sign in to comment.