diff --git a/docs/ucx/docs/reference/index.mdx b/docs/ucx/docs/reference/index.mdx index 7c17f2cd12..46438ddc11 100644 --- a/docs/ucx/docs/reference/index.mdx +++ b/docs/ucx/docs/reference/index.mdx @@ -20,3 +20,4 @@ It includes the following: - [Table Upgrade](/docs/reference/table_upgrade) - [Troubleshooting Guide](/docs/reference/troubleshooting) - [Workflows](/docs/reference/workflows) +- [Workspace Table Scanning](/docs/reference/workspace-table-scanning) diff --git a/docs/ucx/docs/reference/workflows/index.mdx b/docs/ucx/docs/reference/workflows/index.mdx index a8312cc765..6f3aba4167 100644 --- a/docs/ucx/docs/reference/workflows/index.mdx +++ b/docs/ucx/docs/reference/workflows/index.mdx @@ -251,6 +251,11 @@ The output is processed and displayed in the migration dashboard using the in `r - run the [`create-table-mapping` command](/docs/reference/commands#create-table-mapping) - or manually create a `mapping.csv` file in Workspace -> Applications -> ucx +## [EXPERIMENTAL] Workspace Code Scanner Workflow + +The [`workspace-code-scanner-experimental`](/docs/reference/workspace-table-scanning) workflow scans all notebooks and files in the workspace for used tables in the workspace. The workflow performs a static analysis of the code to identify the tables and views used in the code. This is useful to identify schemas being used so that the assessment can be focused on those schemas. THe results are stored in 'used_tables_in_workspace' table in the inventory database. + + ## [EXPERIMENTAL] Migration Progress Workflow The `migration-progress-experimental` workflow populates the tables visualized in the diff --git a/docs/ucx/docs/reference/workspace-table-scanning.md b/docs/ucx/docs/reference/workspace-table-scanning.md new file mode 100644 index 0000000000..11144ea3da --- /dev/null +++ b/docs/ucx/docs/reference/workspace-table-scanning.md @@ -0,0 +1,161 @@ +# Workspace Table Scanning + +UCX now supports comprehensive table usage detection across your entire Databricks workspace, beyond just workflows and dashboards. This expanded capability allows you to discover all table references in notebooks and files within specified workspace paths. + +## Overview + +The new workspace scanning feature expands this to: +- **Workspace**: Tables used in any notebook or file within specified workspace paths + +**Key Benefits:** +- **Discovery-first approach**: Runs as standalone workflow before assessment +- **Scope optimization**: Can limit Hive Metastore scanning to only databases that are referenced +- **Complete coverage**: Finds table usage beyond just workflows and dashboards +- **Independent execution**: Run on-demand without full assessment cycle + +## How It Works + +The workspace table scanner: + +1. **Discovers Objects**: Recursively scans workspace paths to find all notebooks and supported files +2. **Analyzes Content**: Uses UCX's linting framework to extract table usage from each object +3. **Tracks Lineage**: Maintains detailed source lineage information for each table reference +4. **Stores Results**: Saves findings to the `used_tables_in_workspace` inventory table + +## Supported File Types + +The scanner supports: +- **Notebooks**: Python, SQL +- **Files**: Python (.py), SQL (.sql) + +## Configuration + +### Via Standalone Workflow + +UCX now includes a dedicated `workspace-table-scanner` workflow that runs independently: + +**Workflow Parameters:** +- `paths`: JSON list of workspace paths to scan (default: `["/"]`) + +### Via CLI command +You can also run the scanner via the UCX CLI: + +```bash +databricks ucx workspace-table-scanner --paths '["/Users", "/Shared"]' +``` + +### Programmatic Usage + +```python +from databricks.labs.ucx.source_code.linters.workspace import WorkspaceTablesLinter +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler + +# Initialize components +workspace_linter = WorkspaceTablesLinter( + ws=workspace_client, + sql_backend=sql_backend, + inventory_database="ucx_inventory", + path_lookup=path_lookup, + used_tables_crawler=UsedTablesCrawler.for_workspace(sql_backend, "ucx_inventory") +) + +# Scan specific paths +workspace_paths = ["/Users/data_team", "/Shared/analytics"] +workspace_linter.scan_workspace_for_tables(workspace_paths) +``` + +## Typical Workflow Sequence + +For optimal UCX assessment with scope optimization: + +```bash +# 1. Run workspace-table-scanner first (standalone) + +# 2. Use results to configure scope-limited assessment +# The scanner workflow will log suggested include_databases configuration + +# 3. Update your UCX config with discovered databases +# include_databases: ["database1", "database2", "database3"] + +# 4. Run assessment with optimized scope +databricks workflows run assessment + + +**Scope Optimization Example:** +```sql +-- Query to get databases for config +SELECT DISTINCT schema_name +FROM ucx_inventory.used_tables_in_workspace +WHERE catalog_name = 'hive_metastore' +ORDER BY schema_name; +``` + +## Results and Analysis + +### Inventory Table + +Results are stored in `{inventory_database}.used_tables_in_workspace` with the following schema: + +| Column | Type | Description | +|--------|------|-------------| +| `catalog_name` | string | Catalog containing the table | +| `schema_name` | string | Schema containing the table | +| `table_name` | string | Name of the table | +| `source_id` | string | Path to the workspace object | +| `source_lineage` | array | Detailed lineage information | +| `is_write` | boolean | Whether this is a write operation | + +### Example Queries + +**Most used tables across workspace:** +```sql +SELECT + catalog_name, + schema_name, + table_name, + COUNT(*) as usage_count +FROM ucx_inventory.used_tables_in_workspace +GROUP BY catalog_name, schema_name, table_name +ORDER BY usage_count DESC; +``` + +**Table usage by workspace area:** +```sql +SELECT + CASE + WHEN source_id LIKE '%/Users/%' THEN 'User Notebooks' + WHEN source_id LIKE '%/Shared/%' THEN 'Shared Notebooks' + WHEN source_id LIKE '%/Repos/%' THEN 'Repository Code' + ELSE 'Other' + END as workspace_area, + COUNT(DISTINCT CONCAT(catalog_name, '.', schema_name, '.', table_name)) as unique_tables, + COUNT(*) as total_references +FROM ucx_inventory.used_tables_in_workspace +GROUP BY workspace_area; +``` + +**Files with the most table dependencies:** +```sql +SELECT + source_id, + COUNT(DISTINCT CONCAT(catalog_name, '.', schema_name, '.', table_name)) as table_count +FROM ucx_inventory.used_tables_in_workspace +GROUP BY source_id +ORDER BY table_count DESC +LIMIT 20; +``` + +## Best Practices + +### Path Selection +- Start with critical paths like `/Shared/production` or specific team directories +- Avoid scanning entire workspace initially to gauge performance impact +- Exclude test/scratch directories to focus on production code + +### Regular Scanning +- Run workspace scans weekly or monthly to track evolving dependencies +- Compare results over time to identify new table dependencies + +### Result Analysis +- Combine workspace results with workflow and dashboard results for complete picture +- Use the lineage information to understand code relationships diff --git a/labs.yml b/labs.yml index 1a355206f0..d067337d6e 100644 --- a/labs.yml +++ b/labs.yml @@ -98,6 +98,16 @@ commands: description: (Optional) Whether to run the assess-workflows for the collection of workspaces with ucx installed. Default is False. + - name: run-workspace-code-scanner + description: (Experimental) trigger the `workspace-code-scanner-experimental` job to scan the workspace code for fetching tables referenced in the codebase. + flags: + - name: paths + description: The workspace paths to the directory to scan. + - name: run-as-collection + description: (Optional) Whether to run the workspace-code-scanner for the collection of workspaces with ucx + installed. Default is False. + + - name: update-migration-progress description: trigger the `migration-progress-experimental` job to refresh the inventory that tracks the workspace resources and their migration status. diff --git a/src/databricks/labs/ucx/assessment/workflows.py b/src/databricks/labs/ucx/assessment/workflows.py index e9bea3c124..6b5324274a 100644 --- a/src/databricks/labs/ucx/assessment/workflows.py +++ b/src/databricks/labs/ucx/assessment/workflows.py @@ -247,3 +247,25 @@ def failing_task(self, ctx: RuntimeContext): logger.warning("This is a test warning message.") logger.error("This is a test error message.") raise ValueError("This task is supposed to fail.") + + +class WorkspaceCodeScanner(Workflow): + def __init__(self): + super().__init__('workspace-code-scanner-experimental', [JobParameterDefinition(name="paths", default="")]) + + @job_task + def scan_workspace_code(self, ctx: RuntimeContext): + """Scan workspace for table usage using WorkspaceTablesLinter.""" + logger.info("Starting workspace table scanning") + + # Get the path parameter and split by comma if multiple paths + path_param = ctx.named_parameters.get("paths", "") + if not path_param: + logger.error("No path parameter provided. Please provide a comma-separated list of paths to scan.") + else: + paths = [p.strip() for p in path_param.split(",") if p.strip()] + + # Create and use the workspace linter + workspace_linter = ctx.workspace_tables_linter + workspace_linter.scan_workspace_for_tables(paths) + logger.info("Workspace table scanning completed and results stored in inventory database") diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index c72c2113cc..e85c642899 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -257,6 +257,25 @@ def run_assess_workflows( deployed_workflows.run_workflow("assess-workflows", skip_job_wait=run_as_collection) +@ucx.command +def run_workspace_code_scanner_experimental( + w: WorkspaceClient, run_as_collection: bool = False, a: AccountClient | None = None, paths: str | None = None +): + """Manually trigger the workspace-code-scanner-experimental job.""" + if paths is None: + logger.error("--paths is a required parameter.") + return + + workspace_contexts = _get_workspace_contexts(w, a, run_as_collection) + for ctx in workspace_contexts: + workspace_id = ctx.workspace_client.get_workspace_id() + deployed_workflows = ctx.deployed_workflows + logger.info(f"Starting 'workspace-code-scanner-experimental' workflow in workspace: {workspace_id}") + deployed_workflows.run_workflow( + "workspace-code-scanner-experimental", named_parameters={"paths": paths}, skip_job_wait=run_as_collection + ) + + @ucx.command def update_migration_progress( w: WorkspaceClient, diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 15c3e7268b..18e12b6079 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -66,6 +66,7 @@ from databricks.labs.ucx.progress.install import VerifyProgressTracking from databricks.labs.ucx.source_code.graph import DependencyResolver from databricks.labs.ucx.source_code.linters.jobs import WorkflowLinter +from databricks.labs.ucx.source_code.linters.workspace import WorkspaceCodeLinter from databricks.labs.ucx.source_code.known import KnownList from databricks.labs.ucx.source_code.folders import FolderLoader from databricks.labs.ucx.source_code.files import FileLoader, ImportFileResolver @@ -610,6 +611,16 @@ def query_linter(self) -> QueryLinter: self.config.debug_listing_upper_limit, ) + @cached_property + def workspace_tables_linter(self) -> WorkspaceCodeLinter: + return WorkspaceCodeLinter( + self.workspace_client, + self.sql_backend, + self.inventory_database, + self.path_lookup, + self.used_tables_crawler_for_workspace, + ) + @cached_property def directfs_access_crawler_for_paths(self) -> DirectFsAccessCrawler: return DirectFsAccessCrawler.for_paths(self.sql_backend, self.inventory_database) @@ -626,6 +637,10 @@ def used_tables_crawler_for_paths(self): def used_tables_crawler_for_queries(self): return UsedTablesCrawler.for_queries(self.sql_backend, self.inventory_database) + @cached_property + def used_tables_crawler_for_workspace(self): + return UsedTablesCrawler.for_workspace(self.sql_backend, self.inventory_database) + @cached_property def redash(self) -> Redash: return Redash( diff --git a/src/databricks/labs/ucx/runtime.py b/src/databricks/labs/ucx/runtime.py index 33a7c0ac7b..0ad543fa54 100644 --- a/src/databricks/labs/ucx/runtime.py +++ b/src/databricks/labs/ucx/runtime.py @@ -6,7 +6,7 @@ from databricks.sdk.config import with_user_agent_extra from databricks.labs.ucx.__about__ import __version__ -from databricks.labs.ucx.assessment.workflows import Assessment, Failing, AssessWorkflows +from databricks.labs.ucx.assessment.workflows import Assessment, Failing, AssessWorkflows, WorkspaceCodeScanner from databricks.labs.ucx.contexts.workflow_task import RuntimeContext from databricks.labs.ucx.framework.tasks import Workflow, parse_args from databricks.labs.ucx.installer.logs import TaskLogger @@ -52,6 +52,7 @@ def all(cls): ConvertWASBSToADLSGen2(), PermissionsMigrationAPI(), MigrationRecon(), + WorkspaceCodeScanner(), Failing(), ] ) diff --git a/src/databricks/labs/ucx/source_code/linters/workspace.py b/src/databricks/labs/ucx/source_code/linters/workspace.py new file mode 100644 index 0000000000..f1e9cd395c --- /dev/null +++ b/src/databricks/labs/ucx/source_code/linters/workspace.py @@ -0,0 +1,451 @@ +"""Workspace-wide linter for table usage detection. + +This module provides functionality to scan all notebooks and files in a workspace +path and collect table usage information using the UCX linting framework. +""" + +import ast +import base64 +import logging +from functools import partial + +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.workspace import ObjectType, Language +from databricks.labs.blueprint.parallel import Threads +from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex + +from databricks.labs.ucx.source_code.base import ( + UsedTable, + CurrentSessionState, + LineageAtom, +) +from databricks.labs.ucx.source_code.linters.context import LinterContext +from databricks.labs.ucx.source_code.path_lookup import PathLookup +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler +from databricks.labs.ucx.workspace_access.generic import WorkspaceObjectInfo +from databricks.labs.ucx.workspace_access.listing import WorkspaceListing + +logger = logging.getLogger(__name__) + + +class WorkspaceCodeLinter: + """Linter for extracting table usage from all notebooks and files in workspace paths. + + This class scans workspace paths recursively to find all notebooks and files, + then uses the UCX linting framework to extract table usage information. + """ + + def __init__( + self, + ws: WorkspaceClient, + sql_backend: SqlBackend, + inventory_database: str, + path_lookup: PathLookup, + used_tables_crawler: UsedTablesCrawler, + max_workers: int = 10, + ): + """Initialize the WorkspaceTablesLinter. + + Args: + ws: Databricks WorkspaceClient for API access + sql_backend: SQL backend for storing results + inventory_database: Database name for storing inventory + path_lookup: Path lookup for resolving dependencies + used_tables_crawler: Crawler for storing used table results + max_workers: Maximum number of parallel workers for processing + """ + self._ws = ws + self._sql_backend = sql_backend + self._inventory_database = inventory_database + self._path_lookup = path_lookup + self._used_tables_crawler = used_tables_crawler + self._max_workers = max_workers + + def _get_language_from_path(self, path: str) -> Language | None: + """Determine language from file path extension. + + Args: + path: File path + + Returns: + Language enum or None if not supported + """ + + extension = path.lower().split('.')[-1] if '.' in path else '' + + language_map = { + 'py': Language.PYTHON, + 'sql': Language.SQL, + 'scala': Language.SCALA, + 'r': Language.R, + } + + return language_map.get(extension) + + def _discover_workspace_objects(self, workspace_path: str) -> list[WorkspaceObjectInfo]: + """Discover all relevant workspace objects in the given path. + + Args: + workspace_path: Workspace path to scan + + Returns: + List of workspace objects (notebooks and files) + """ + ws_listing = WorkspaceListing(self._ws, num_threads=self._max_workers, with_directories=False) + workspace_objects = [] + + for obj in ws_listing.walk(workspace_path): + if obj is None or obj.object_type is None: + continue + + # Only process notebooks and files that can contain code + if obj.object_type in (ObjectType.NOTEBOOK, ObjectType.FILE): + raw = obj.as_dict() + obj_path = raw.get("path") + if obj_path: # Only include objects with valid paths + workspace_objects.append( + WorkspaceObjectInfo( + path=obj_path, + object_type=raw.get("object_type"), + object_id=str(raw.get("object_id")), + language=raw.get("language"), + ) + ) + + logger.info(f"Discovered {len(workspace_objects)} workspace objects in {workspace_path}") + return workspace_objects + + def _extract_tables_from_objects(self, workspace_objects: list[WorkspaceObjectInfo]) -> list[UsedTable]: + """Extract table usage from workspace objects using parallel processing. + + Args: + workspace_objects: List of workspace objects to process + + Returns: + List of used tables found in the objects + """ + if not workspace_objects: + return [] + + tasks = [] + for obj in workspace_objects: + if obj.path: + tasks.append(partial(self._extract_tables_from_object, obj)) + + logger.info(f"Processing {len(tasks)} workspace objects in parallel...") + results, errors = Threads.gather('extracting tables from workspace objects', tasks) + + if errors: + logger.warning(f"Encountered {len(errors)} errors during processing") + for error in errors[:5]: # Log first 5 errors + logger.warning("Logging first 5 errors:") + logger.warning(f"Processing error: {error}") + + all_tables = [] + for tables in results: + all_tables.extend(tables) + + return all_tables + + def _extract_tables_from_object(self, obj: WorkspaceObjectInfo) -> list[UsedTable]: + """Extract table usage from a single workspace object. + + Args: + obj: Workspace object to process + + Returns: + List of used tables found in the object + """ + if not obj.path: + return [] + + # Create a source lineage for the object + source_lineage = [ + LineageAtom( + object_type=obj.object_type or "UNKNOWN", + object_id=obj.path or "UNKNOWN", + other={ + "language": obj.language or "UNKNOWN", + }, + ) + ] + + # Determine if this is a notebook or file based on object type and path + # For now, let's be more conservative and only treat explicit NOTEBOOK types as notebooks + # We can enhance this later with content-based detection if needed + if obj.object_type == ("NOTEBOOK"): + return self._extract_tables_from_notebook(obj, source_lineage) + if obj.object_type == ("FILE"): + return self._extract_tables_from_file(obj, source_lineage) + logger.warning(f"Unsupported object type: {obj.object_type}") + return [] + + def _get_str_content_from_path(self, path: str) -> str: + """Download and decode content from a workspace path. + + Args: + path: Path to the workspace path + + Returns: + Decoded content as string + """ + # Download file content + export_response = self._ws.workspace.export(path) + if isinstance(export_response.content, bytes): + return export_response.content.decode('utf-8') + try: + # If content is a string representation of bytes, convert it back to bytes + # Try to evaluate the string as a bytes literal + content_bytes = ast.literal_eval(str(export_response.content)) + return content_bytes.decode('utf-8') + except (ValueError, SyntaxError): + # If that fails, try base64 decoding + try: + return base64.b64decode(str(export_response.content)).decode('utf-8') + except ValueError: + # If that also fails, treat it as a regular string + return str(export_response.content) + + def _extract_tables_from_notebook( + self, obj: WorkspaceObjectInfo, source_lineage: list[LineageAtom] + ) -> list[UsedTable]: + """Extract table usage from a notebook. + + + Args: + obj: Notebook object + source_lineage: Source lineage for tracking + + Returns: + List of used tables found in the notebook + """ + # Download notebook content + content = self._get_str_content_from_path(obj.path) + return self._extract_tables_from_notebook_content(obj, content, source_lineage) + + def _extract_tables_from_file(self, obj: WorkspaceObjectInfo, source_lineage: list[LineageAtom]) -> list[UsedTable]: + """Extract table usage from a file. + + Args: + obj: File object + source_lineage: Source lineage for tracking + + Returns: + List of used tables found in the file + """ + if not obj.path: + return [] + content = self._get_str_content_from_path(obj.path) + + # Check if this is actually a Databricks notebook stored as a file + if "# Databricks notebook source" in content: + logger.info(f"Detected notebook content in file {obj.path}, treating as notebook") + return self._extract_tables_from_notebook_content(obj, content, source_lineage) + + return self._process_file_content_for_tables(obj, content, source_lineage) + + @staticmethod + def _get_clean_cell_content(cell_content: str) -> str: + """Clean up cell content by removing magic commands and leading/trailing whitespace. + + Args: + cell_content: Raw cell content + + Returns: + Cleaned cell content + """ + if not cell_content.strip().startswith('# MAGIC'): + return cell_content + + # Remove MAGIC prefixes and clean up + clean_lines = [] + for line in cell_content.split('\n'): + if line.strip().startswith('# MAGIC'): + # Remove the # MAGIC prefix + clean_line = line.replace('# MAGIC ', '') + # For SQL magic commands, also remove the %sql part + if clean_line.strip() != '%sql': + clean_lines.append(clean_line) + else: + clean_lines.append(line) + return '\n'.join(clean_lines) + + def _get_language_from_content(self, cell_content: str) -> Language: + """Determine the language of a notebook cell based on magic commands. + + Args: + cell_content: Raw cell content + + Returns: + Detected Language enum (default to Python) + """ + + if cell_content.strip().startswith('# MAGIC %sql'): + return Language.SQL + if cell_content.strip().startswith('# MAGIC %scala'): + return Language.SCALA + if cell_content.strip().startswith('# MAGIC %r'): + return Language.R + return Language.PYTHON + + def _extract_tables_from_notebook_content( + self, obj: WorkspaceObjectInfo, content: str, source_lineage: list[LineageAtom] + ) -> list[UsedTable]: + """Extract table usage from notebook content without using Notebook.parse(). + + This method handles notebook content that might not parse correctly with Notebook.parse() + by manually extracting Python/SQL code from the notebook cells. + + Args: + obj: Workspace object + content: Notebook content as string + source_lineage: Source lineage for tracking + + Returns: + List of used tables found in the notebook + """ + # Split content into lines and extract cells manually + lines = content.split('\n') + if not lines[0].startswith("# Databricks notebook source"): + logger.warning(f"Content doesn't start with notebook header: {obj.path}") + return [] + + # Extract cells by looking for # COMMAND ---------- separators + cells = [] + current_cell: list[str] = [] + + for line in lines[1:]: # Skip the header line + if line.strip() == "# COMMAND ----------": + if current_cell: + cells.append('\n'.join(current_cell)) + current_cell = [] + else: + current_cell.append(line) + + # Add the last cell if it exists + if current_cell: + cells.append('\n'.join(current_cell)) + + logger.info(f"Extracted {len(cells)} cells from notebook {obj.path}") + + return self._process_cells_for_tables(obj, cells, source_lineage) + + def _process_cells_for_tables( + self, obj: WorkspaceObjectInfo, cells: list[str], source_lineage: list[LineageAtom] + ) -> list[UsedTable]: + """Process notebook cells to extract table usage. + + Args: + obj: Workspace object + cells: List of cell contents + source_lineage: Source lineage for tracking + + Returns: + List of used tables found in the cells + """ + # Process each cell to extract tables + all_tables = [] + dummy_index = TableMigrationIndex([]) + linter_context = LinterContext(dummy_index, CurrentSessionState()) + + for i, cell_content in enumerate(cells): + if not cell_content.strip(): + continue + + # Determine cell language (default to Python for now) + # Check if cell has magic commands that indicate language + cell_language = self._get_language_from_content(cell_content) + + # Get appropriate collector for the cell language + try: + collector = linter_context.tables_collector(cell_language) + except ValueError as e: + logger.warning(f"Failed to get collector for language {cell_language}: {e}") + continue + + # Clean up the cell content (remove MAGIC prefixes) + clean_content = self._get_clean_cell_content(cell_content) + + cell_tables = list(collector.collect_tables(clean_content)) + logger.info(f"Found {len(cell_tables)} tables in cell {i}") + + # Add source lineage to each table + for table in cell_tables: + all_tables.append( + table.replace_source( + source_id=obj.path, + source_lineage=source_lineage, + ) + ) + return all_tables + + def _process_file_content_for_tables( + self, obj: WorkspaceObjectInfo, content: str, source_lineage: list[LineageAtom] + ) -> list[UsedTable]: + """Process file content to extract table usage. + + Args: + obj: Workspace object + content: File content as string + source_lineage: Source lineage for tracking + + Returns: + List of used tables found in the file content + """ + # Determine language from file extension + language = self._get_language_from_path(obj.path) + if not language: + logger.debug(f"Unsupported file type: {obj.path}") + return [] + + # Create linter context with dummy migration index to use full collectors + dummy_index = TableMigrationIndex([]) + linter_context = LinterContext(dummy_index, CurrentSessionState()) + + # Get appropriate collector for the language + # At this point language is guaranteed to be not None + assert language is not None + collector = linter_context.tables_collector(language) + tables = list(collector.collect_tables(str(content))) + + # Add source lineage to each table + result_tables = [] + for table in tables: + if hasattr(table, 'replace_source'): + result_tables.append( + table.replace_source( + source_id=obj.path, + source_lineage=source_lineage, + ) + ) + else: + result_tables.append(table) + + return result_tables + + def scan_workspace_for_tables(self, workspace_paths: list[str] | None = None) -> None: + """Scan workspace paths for table usage and store results. + + Args: + workspace_paths: List of workspace paths to scan. If None, scans entire workspace. + """ + if workspace_paths is None: + workspace_paths = ["/"] + + all_tables = [] + for workspace_path in workspace_paths: + logger.info(f"Scanning workspace path: {workspace_path}") + workspace_objects = self._discover_workspace_objects(workspace_path) + logger.info(f"Found {len(workspace_objects)} workspace objects in {workspace_path}") + tables_from_path = self._extract_tables_from_objects(workspace_objects) + logger.info(f"Extracted {len(tables_from_path)} used tables from {workspace_path}") + all_tables.extend(tables_from_path) + + # Store all discovered tables in the database + if all_tables: + logger.info(f"Storing {len(all_tables)} discovered tables in database") + self._used_tables_crawler.dump_all(all_tables) + logger.info(f"Successfully stored {len(all_tables)} tables") + else: + logger.info("No tables found to store") diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index b5cdb77c0b..81cf265cdb 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -33,6 +33,10 @@ def for_paths(cls, backend: SqlBackend, schema: str) -> UsedTablesCrawler: def for_queries(cls, backend: SqlBackend, schema: str) -> UsedTablesCrawler: return UsedTablesCrawler(backend, schema, "used_tables_in_queries") + @classmethod + def for_workspace(cls, backend: SqlBackend, schema: str) -> UsedTablesCrawler: + return UsedTablesCrawler(backend, schema, "used_tables_in_workspace") + def dump_all(self, tables: Sequence[UsedTable]) -> None: """This crawler doesn't follow the pull model because the fetcher fetches data for 3 crawlers, not just one It's not **bad** because all records are pushed at once. diff --git a/tests/integration/source_code/test_workspace_code_scanner.py b/tests/integration/source_code/test_workspace_code_scanner.py new file mode 100644 index 0000000000..d63a5c9336 --- /dev/null +++ b/tests/integration/source_code/test_workspace_code_scanner.py @@ -0,0 +1,122 @@ +"""Integration tests for WorkspaceTablesLinter functionality.""" + +import logging +from datetime import timedelta + +from databricks.sdk.errors import NotFound +from databricks.sdk.retries import retried +from databricks.sdk.service.workspace import ImportFormat, Language + +logger = logging.getLogger(__name__) + + +@retried(on=[NotFound], timeout=timedelta(minutes=2)) +def test_workspace_tables_linter_python_notebook(ws, simple_ctx, make_random, request): + """Test that WorkspaceTablesLinter correctly identifies table usage in Databricks notebooks.""" + + # Create a test Databricks notebook with table references + notebook_content = '''# Databricks notebook source +# MAGIC %md +# MAGIC # Test Notebook for Table Discovery +# MAGIC +# MAGIC This notebook contains various table references for testing. + +# COMMAND ---------- + +# Read from a table +df1 = spark.table("sales.customers") +df2 = spark.sql("SELECT * FROM marketing.campaigns") + +# COMMAND ---------- + +# Write to a table using DataFrame method chaining +df1.write.mode("overwrite").saveAsTable("analytics.customer_analysis") + +# COMMAND ---------- + +# PySpark table operations +spark.read.table("warehouse.products").createOrReplaceTempView("temp_products") + +# COMMAND ---------- +dbutils.fs.rm("abfss://standard@accounting.dfs.core.windows.net/projects/accounting/records",True) + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC drop table accounting.records + +# COMMAND ---------- + +# DBTITLE 1,accounting.records table creation +# MAGIC %sql +# MAGIC CREATE TABLE accounting.records ( +# MAGIC team STRING, +# MAGIC expenses STRING, +# MAGIC team_id STRING, +# MAGIC dest_cd STRING, +# MAGIC dest_desc STRING, +# MAGIC USING delta +# MAGIC location 'abfss://standard@accounting.dfs.core.windows.net/projects/accounting/records' +''' + + def cleanup(): + try: + ws.workspace.delete(notebook_path, recursive=False) + except NotFound: + pass + + request.addfinalizer(cleanup) + + # Upload the Databricks notebook to workspace + ws.workspace.mkdirs("/tmp") + notebook_path = f"/tmp/test_workspace_linting_{make_random()}.py" + ws.workspace.upload( + path=notebook_path, + content=notebook_content, + format=ImportFormat.SOURCE, + language=Language.PYTHON, + overwrite=True, + ) + + # Run the workspace tables linter on the uploaded notebook + workspace_linter = simple_ctx.workspace_tables_linter + logger.info("Starting workspace scan for path: /tmp") + workspace_linter.scan_workspace_for_tables(["/tmp"]) + logger.info("Workspace scan completed") + + # Verify results in used_tables_in_workspace table + cursor = simple_ctx.sql_backend.fetch( + f""" + SELECT catalog_name, schema_name, table_name, source_id, is_write + FROM {simple_ctx.inventory_database}.used_tables_in_workspace + WHERE source_id LIKE '/tmp/test_workspace_linting_%' + ORDER BY schema_name, table_name + """ + ) + results = list(cursor) + logger.info(f"Found {len(results)} table references in database:") + for result in results: + logger.info(f" - {result['schema_name']}.{result['table_name']} (is_write: {result['is_write']})") + + # Expected tables to be found + expected_tables = { + ('sales', 'customers', False), # spark.table("sales.customers") + ('marketing', 'campaigns', False), # FROM marketing.campaigns + ('warehouse', 'products', False), # spark.read.table + ('analytics', 'customer_analysis', True), # saveAsTable("analytics.customer_analysis") + ('accounting', 'records', False), # CREATE TABLE accounting.records + } + + # Verify we found the expected tables + assert len(results) == len(expected_tables), ( + f"Expected at least " f"{expected_tables} table references, got {len(results)}" + ) + + # Convert to a set for easier checking + found_tables = {(r['schema_name'], r['table_name'], r['is_write']) for r in results} + + # Check that all expected tables were found + for expected in expected_tables: + assert expected in found_tables, f"Expected table {expected} not found in {found_tables}" + + logger.info(f"Successfully detected {len(results)} table references in notebook") diff --git a/tests/unit/source_code/linters/test_workspace.py b/tests/unit/source_code/linters/test_workspace.py new file mode 100644 index 0000000000..6b99e18b9e --- /dev/null +++ b/tests/unit/source_code/linters/test_workspace.py @@ -0,0 +1,127 @@ +"""Unit tests for WorkspaceCodeLinter.""" + +from unittest.mock import create_autospec +from databricks.sdk.service.workspace import Language, ImportFormat, ObjectType, ExportResponse, ObjectInfo + +from databricks.labs.ucx.source_code.linters.workspace import WorkspaceCodeLinter +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler + +from databricks.labs.ucx.workspace_access.listing import WorkspaceListing + + +class TestWorkspaceCodeLinter: + """Test cases for WorkspaceCodeLinter.""" + + def test_scan_workspace_for_tables_empty_and_none_paths(self, ws, tmp_path, mock_path_lookup, mock_backend): + """Test successful workspace scanning with table detection.""" + # Create mock dependencies + mock_used_tables_crawler = create_autospec(UsedTablesCrawler) + mock_workspace_listing = create_autospec(WorkspaceListing) + + # Mock the WorkspaceListing to return empty results + mock_workspace_listing.walk.return_value = [] # Empty workspace + + # Create the linter instance + linter = WorkspaceCodeLinter( + ws=ws, + sql_backend=mock_backend, + inventory_database="test_db", + path_lookup=mock_path_lookup, + used_tables_crawler=mock_used_tables_crawler, + ) + + # Call the method under test with tmp_path + linter.scan_workspace_for_tables([str(tmp_path)]) + # Call the method under test with empty paths + linter.scan_workspace_for_tables([]) + # Call the method under test with None paths + linter.scan_workspace_for_tables(None) + + mock_used_tables_crawler.dump_all.assert_not_called() + + def test_scan_workspace_for_python_file(self, ws, tmp_path, mock_path_lookup, mock_backend): + """Test successful workspace scanning with table detection.""" + # Create mock dependencies + mock_used_tables_crawler = create_autospec(UsedTablesCrawler) + mock_used_tables_crawler.dump_all.assert_not_called() + + # Create a Python file with table references + python_file_path = tmp_path / "test_script.py" + python_file_path.write_text( + """# Databricks notebook source +# COMMAND ---------- + +# Read from a table +df1 = spark.table("sales.customers") +df2 = spark.sql("SELECT * FROM marketing.campaigns") + +# COMMAND ---------- + +# Write to a table using DataFrame method chaining +df1.write.mode("overwrite").saveAsTable("analytics.customer_analysis") + +# COMMAND ---------- + +# PySpark table operations +spark.read.table("warehouse.products").createOrReplaceTempView("temp_products") +""" + ) + + # Upload the file to the workspace + workspace_path = "/tmp/test_workspace_linting.py" + ws.workspace.upload( + path=workspace_path, + content=python_file_path.read_text(), + format=ImportFormat.SOURCE, + language=Language.PYTHON, + overwrite=True, + ) + + # Configure the mock workspace client to return uploaded file when listing + # WorkspaceListing calls ws.workspace.list(path=path, recursive=False) + mock_file_info = ObjectInfo( + object_id="123", + object_type=ObjectType.NOTEBOOK, + path=workspace_path, + language=Language.PYTHON, + ) + + # Mock the workspace.list method to return file + def mock_list_workspace(path): + if path == "/tmp": + return [mock_file_info] + return [] + + # Mock the workspace methods properly + ws.workspace.get_status.return_value = ObjectInfo( + object_id="root", + object_type=ObjectType.DIRECTORY, + path="/tmp", + ) + + ws.workspace.list.side_effect = mock_list_workspace + ws.workspace.export.return_value = ExportResponse(content=python_file_path.read_text()) + + # Create the linter instance + linter = WorkspaceCodeLinter( + ws=ws, + sql_backend=mock_backend, + inventory_database="test_db", + path_lookup=mock_path_lookup, + used_tables_crawler=mock_used_tables_crawler, + ) + + # Scan the workspace for tables + linter.scan_workspace_for_tables(["/tmp"]) + + assert not mock_used_tables_crawler.dump_all.called + + if mock_used_tables_crawler.dump_all.called: + call_args = mock_used_tables_crawler.dump_all.call_args[0][0] + print(f"dump_all called with {len(call_args)} tables") + for table in call_args: + print(f" - {table.schema_name}.{table.table_name} (read: {table.is_read}, write: {table.is_write})") + else: + print("dump_all was not called") + + assert True