Skip to content

Commit

Permalink
changes for 3-part naming and fixing test integration for datahub (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
rvoona authored May 31, 2024
1 parent f48a0af commit f4f346c
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
2 changes: 1 addition & 1 deletion ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ GRANT AUTHENTICATION v_oauth TO oauth_user;
GRANT ALL ON SCHEMA PUBLIC TO oauth_user;

-- Create a VIEW
CREATE VIEW sampleview AS SELECT SUM(annual_income), customer_state
CREATE VIEW sampleview AS SELECT SUM(annual_income) as sum_annual_income, customer_state
FROM public.customer_dimension
WHERE customer_key IN (SELECT customer_key FROM store.store_sales_fact)
GROUP BY customer_state ORDER BY customer_state ASC;
Expand Down
2 changes: 1 addition & 1 deletion test/sample_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
]

sample_view_columns = [
'SUM',
'sum_annual_income',
'customer_state'

]
Expand Down
2 changes: 1 addition & 1 deletion test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def test_get_all_projection_columns(vconn):

def test__populate_view_lineage(vconn):
res = vconn[0].dialect._populate_view_lineage(connection=vconn[1], view=sample.sample_view ,schema="public")
upstream = "public.customer_dimension"
upstream = "VMart.public.customer_dimension"
downstream = next(iter(res.keys()))
assert res[downstream][0][0] == upstream

Expand Down
19 changes: 10 additions & 9 deletions vertica_sqlalchemy_dialect/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ def get_table_names(self, connection, schema=None, **kw):
get_tables_sql = sql.text(
dedent(
"""
SELECT table_name
SELECT lower(table_name) as table_name
FROM v_catalog.tables
WHERE %(schema_condition)s
ORDER BY table_schema, table_name
Expand Down Expand Up @@ -810,10 +810,10 @@ def get_view_names(self, connection, schema=None, **kw):
get_views_sql = sql.text(
dedent(
"""
SELECT table_name
SELECT lower(table_name) as table_name
FROM v_catalog.views
WHERE %(schema_condition)s
ORDER BY table_schema, table_name
ORDER BY table_schema, lower(table_name)
"""
% {"schema_condition": schema_condition}
)
Expand Down Expand Up @@ -847,7 +847,7 @@ def fetch_view_definitions(self, connection,schema):
for data in connection.execute(view_def):
definition.append({
"view_def": data['VIEW_DEFINITION'],
"table_name": data['table_name']
"table_name": data['table_name'].lower()
})

return definition
Expand Down Expand Up @@ -902,7 +902,7 @@ def fetch_table_columns(self, connection, schema):

columns = []
for row in connection.execute(s):
name = row.column_name
name = row.column_name.lower()
dtype = row.data_type.lower()
default = row.column_default
nullable = row.is_nullable
Expand Down Expand Up @@ -1928,7 +1928,7 @@ def fetch_view_columns(self, connection, schema):
columns = []

for row in connection.execute(s):
name = row.column_name
name = row.column_name.lower()
dtype = row.data_type.lower()
default = row.column_default
nullable = row.is_nullable
Expand Down Expand Up @@ -2052,7 +2052,7 @@ def fetch_view_lineage(self, connection,schema) -> None:
view_upstream_lineage_query = sql.text(
dedent(
"""
select table_name ,table_schema, reference_table_name ,reference_table_schema from v_catalog.view_tables where table_schema = '%(schema)s' """
select (select database_name from v_catalog.databases) database_name,table_name ,table_schema, reference_table_name ,reference_table_schema from v_catalog.view_tables where table_schema = '%(schema)s' """
% {"schema": schema}
)
)
Expand All @@ -2062,6 +2062,7 @@ def fetch_view_lineage(self, connection,schema) -> None:
# refrence_table.append(data)
refrence_table.append(
{
"database_name": data["database_name"],
"reference_table_name": data["reference_table_name"],
"reference_table_schema": data["reference_table_schema"],
"view_name": data["table_name"],
Expand Down Expand Up @@ -2094,9 +2095,9 @@ def _populate_view_lineage(self, connection, view, schema: str) -> None:
for lineage in refrence_table:


downstream = f"{lineage['table_schema']}.{lineage['view_name']}"
downstream = f"{lineage['database_name']}.{lineage['table_schema']}.{lineage['view_name']}"

upstream = f"{lineage['reference_table_schema']}.{lineage['reference_table_name']}"
upstream = f"{lineage['database_name']}.{lineage['reference_table_schema']}.{lineage['reference_table_name']}"

view_upstream: str = upstream
view_name: str = downstream
Expand Down

0 comments on commit f4f346c

Please sign in to comment.