Skip to content

Commit

Permalink
Merge branch 'main' into skip-ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
chirag-madlani authored Jan 10, 2024
2 parents 473560c + c27b0cf commit 868027f
Showing 7 changed files with 94 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -249,6 +249,23 @@ def query_table_names_and_types(
for table_name in self.inspector.get_table_names(schema_name) or []
]

def query_view_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the view
name and type. By default, use the inspector method
to get the names and pass the View type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., material views,...
"""

return [
TableNameAndType(name=table_name, type_=TableType.View)
for table_name in self.inspector.get_view_names(schema_name) or []
]

def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
"""
Handle table and views.
@@ -288,8 +305,10 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
yield table_name, table_and_type.type_

if self.source_config.includeViews:
for view_name in self.inspector.get_view_names(schema_name):
view_name = self.standardize_table_name(schema_name, view_name)
for view_and_type in self.query_view_names_and_types(schema_name):
view_name = self.standardize_table_name(
schema_name, view_and_type.name
)
view_fqn = fqn.build(
self.metadata,
entity_type=Table,
@@ -310,7 +329,7 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
"Table Filtered Out",
)
continue
yield view_name, TableType.View
yield view_name, view_and_type.type_
except Exception as err:
logger.warning(
f"Fetching tables names failed for schema {schema_name} due to - {err}"
@@ -320,7 +339,7 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
def get_view_definition(
self, table_type: str, table_name: str, schema_name: str, inspector: Inspector
) -> Optional[str]:
if table_type == TableType.View:
if table_type in (TableType.View, TableType.MaterializedView):
try:
view_definition = inspector.get_view_definition(table_name, schema_name)
view_definition = (
Original file line number Diff line number Diff line change
@@ -135,6 +135,15 @@ def is_query_valid(self, row) -> bool:
or query_text.startswith(QUERY_WITH_OM_VERSION)
)

def list_jobs_test_connection(self) -> None:
data = {"limit": 1, "expand_tasks": True, "offset": 0}
self.client.get(
self.jobs_list_url,
data=json.dumps(data),
headers=self.headers,
timeout=API_TIMEOUT,
).json()

def list_jobs(self) -> List[dict]:
"""
Method returns List all the created jobs in a Databricks Workspace
Original file line number Diff line number Diff line change
@@ -88,6 +88,7 @@
get_unique_constraints,
get_view_definition,
get_view_names,
get_view_names_reflection,
normalize_names,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
@@ -121,6 +122,7 @@
get_schema_columns
)
Inspector.get_table_names = get_table_names_reflection
Inspector.get_view_names = get_view_names_reflection
SnowflakeDialect._current_database_schema = ( # pylint: disable=protected-access
_current_database_schema
)
@@ -518,6 +520,33 @@ def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
)
)

def query_view_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
"""
Connect to the source database to get the view
name and type. By default, use the inspector method
to get the names and pass the View type.
This is useful for sources where we need fine-grained
logic on how to handle table types, e.g., material views,...
"""

regular_views = [
TableNameAndType(name=view_name, type_=TableType.View)
for view_name in self.inspector.get_view_names(schema_name) or []
]

materialized_views = [
TableNameAndType(name=view_name, type_=TableType.MaterializedView)
for view_name in self.inspector.get_view_names(
schema_name, materialized_views=True
)
or []
]

return regular_views + materialized_views

def get_stored_procedures(self) -> Iterable[SnowflakeStoredProcedure]:
"""List Snowflake stored procedures"""
if self.source_config.includeStoredProcedures:
Original file line number Diff line number Diff line change
@@ -67,6 +67,12 @@
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'VIEW'
"""

SNOWFLAKE_GET_MVIEW_NAMES = """
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}' and TABLE_TYPE = 'MATERIALIZED VIEW'
"""

SNOWFLAKE_GET_TRANSIENT_NAMES = """
select TABLE_NAME from information_schema.tables
where TABLE_SCHEMA = '{}'
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_COMMENTS,
SNOWFLAKE_GET_EXTERNAL_TABLE_NAMES,
SNOWFLAKE_GET_MVIEW_NAMES,
SNOWFLAKE_GET_SCHEMA_COLUMNS,
SNOWFLAKE_GET_TRANSIENT_NAMES,
SNOWFLAKE_GET_VIEW_NAMES,
@@ -74,6 +75,20 @@ def get_table_names_reflection(self, schema=None, **kw):
)


def get_view_names_reflection(self, schema=None, **kw):
"""Return all view names in `schema`.
:param schema: Optional, retrieve names from a non-default schema.
For special quoting, use :class:`.quoted_name`.
"""

with self._operation_context() as conn: # pylint: disable=protected-access
return self.dialect.get_view_names(
conn, schema, info_cache=self.info_cache, **kw
)


def get_table_names(self, connection, schema, **kw):
query = SNOWFLAKE_GET_WITHOUT_TRANSIENT_TABLE_NAMES
if kw.get("include_transient_tables"):
@@ -86,8 +101,12 @@ def get_table_names(self, connection, schema, **kw):
return result


def get_view_names(self, connection, schema, **kw): # pylint: disable=unused-argument
cursor = connection.execute(SNOWFLAKE_GET_VIEW_NAMES.format(schema))
def get_view_names(self, connection, schema, **kw):
if kw.get("materialized_views"):
query = SNOWFLAKE_GET_MVIEW_NAMES
else:
query = SNOWFLAKE_GET_VIEW_NAMES
cursor = connection.execute(query.format(schema))
result = [self.normalize_name(row[0]) for row in cursor]
return result

Original file line number Diff line number Diff line change
@@ -44,11 +44,7 @@ def test_connection(
of a metadata workflow or during an Automation Workflow
"""

def custom_executor_for_pipeline():
result = client.list_jobs()
return list(result)

test_fn = {"GetPipelines": custom_executor_for_pipeline}
test_fn = {"GetPipelines": client.list_jobs_test_connection}

test_connection_steps(
metadata=metadata,
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import { ERROR_PLACEHOLDER_TYPE } from '../../enums/common.enum';
import { EntityType } from '../../enums/entity.enum';
import { DatabaseSchema } from '../../generated/entity/data/databaseSchema';
import { Table } from '../../generated/entity/data/table';
import entityUtilClassBase from '../../utils/EntityUtilClassBase';
import { getEntityName } from '../../utils/EntityUtils';

interface SchemaTablesTabProps {
@@ -80,7 +81,10 @@ function SchemaTablesTab({
<Link
className="break-word"
data-testid="table-name-link"
to={(EntityType.TABLE, record.fullyQualifiedName as string)}>
to={entityUtilClassBase.getEntityLink(
EntityType.TABLE,
record.fullyQualifiedName as string
)}>
{getEntityName(record)}
</Link>
</div>

0 comments on commit 868027f

Please sign in to comment.