Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/redshift): Identify materialized views properly + fix connection args support #9368

Merged
merged 2 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions metadata-ingestion/docs/sources/redshift/redshift_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ source:

options:
connect_args:
sslmode: "prefer" # or "require" or "verify-ca"
sslrootcert: ~ # needed to unpin the AWS Redshift certificate
# check all available options here: https://pypi.org/project/redshift-connector/
ssl_insecure: "false" # Specifies if IDP hosts server certificate will be verified

sink:
# sink configs
8 changes: 2 additions & 6 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@
redshift_common = {
# Clickhouse 0.8.3 adds support for SQLAlchemy 1.4.x
"sqlalchemy-redshift>=0.8.3",
"psycopg2-binary",
"GeoAlchemy2",
"redshift-connector",
*sqllineage_lib,
*path_spec_common,
}
Expand Down Expand Up @@ -365,11 +365,7 @@
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common
| redshift_common
| usage_common
| {"redshift-connector"}
| sqlglot_lib,
"redshift": sql_common | redshift_common | usage_common | sqlglot_lib,
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"sagemaker": aws_common,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class RedshiftConfig(
# large Redshift warehouses. As an example, see this query for the columns:
# https://github.com/sqlalchemy-redshift/sqlalchemy-redshift/blob/60b4db04c1d26071c291aeea52f1dcb5dd8b0eb0/sqlalchemy_redshift/dialect.py#L745.
scheme: str = Field(
default="redshift+psycopg2",
default="redshift+redshift_connector",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should call this out in the upgrade guide as a breaking change, since some config that people previously were using may not work anymore (particularly if certain sslmode options no longer work)

description="",
hidden_from_schema=True,
)
Expand Down Expand Up @@ -170,3 +170,24 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)
return values

@root_validator(skip_on_failure=True)
def connection_config_compatibility_set(cls, values: Dict) -> Dict:
if (
("options" in values and "connect_args" in values["options"])
and "extra_client_options" in values
and len(values["extra_client_options"]) > 0
):
raise ValueError(
"Cannot set both `connect_args` and `extra_client_options` in the config. Please use `extra_client_options` only."
)

if "options" in values and "connect_args" in values["options"]:
values["extra_client_options"] = values["options"]["connect_args"]

if values["extra_client_options"]:
if values["options"]:
values["options"]["connect_args"] = values["extra_client_options"]
else:
values["options"] = {"connect_args": values["extra_client_options"]}
return values
18 changes: 13 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,18 @@ class RedshiftQuery:

additional_table_metadata: str = """
select
database,
schema,
ti.database,
ti.schema,
"table",
size,
tbl_rows,
estimated_visible_rows,
skew_rows,
last_accessed
last_accessed,
case
when smi.name is not null then 1
else 0
end as is_materialized
from
pg_catalog.svv_table_info as ti
left join (
Expand All @@ -198,8 +202,12 @@ class RedshiftQuery:
group by
tbl) as la on
(la.tbl = ti.table_id)
;
"""
left join stv_mv_info smi on
smi.db_name = ti.database
and smi.schema = ti.schema
and smi.name = ti.table
;
"""

@staticmethod
def stl_scan_based_lineage_query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import humanfriendly

# These imports verify that the dependencies are available.
import psycopg2 # noqa: F401
import pydantic
import redshift_connector

Expand Down Expand Up @@ -352,15 +351,14 @@ def create(cls, config_dict, ctx):
def get_redshift_connection(
config: RedshiftConfig,
) -> redshift_connector.Connection:
client_options = config.extra_client_options
host, port = config.host_port.split(":")
conn = redshift_connector.connect(
host=host,
port=int(port),
user=config.username,
database=config.database,
password=config.password.get_secret_value() if config.password else None,
**client_options,
**config.extra_client_options,
)

conn.autocommit = True
Expand Down Expand Up @@ -641,7 +639,7 @@ def gen_view_dataset_workunits(
dataset_urn = self.gen_dataset_urn(datahub_dataset_name)
if view.ddl:
view_properties_aspect = ViewProperties(
materialized=view.type == "VIEW_MATERIALIZED",
materialized=view.materialized,
viewLanguage="SQL",
viewLogic=view.ddl,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class RedshiftTable(BaseTable):
@dataclass
class RedshiftView(BaseTable):
type: Optional[str] = None
materialized: bool = False
columns: List[RedshiftColumn] = field(default_factory=list)
last_altered: Optional[datetime] = None
size_in_bytes: Optional[int] = None
Expand All @@ -66,6 +67,7 @@ class RedshiftExtraTableMeta:
estimated_visible_rows: Optional[int] = None
skew_rows: Optional[float] = None
last_accessed: Optional[datetime] = None
is_materialized: bool = False


@dataclass
Expand Down Expand Up @@ -148,6 +150,7 @@ def enrich_tables(
],
skew_rows=meta[field_names.index("skew_rows")],
last_accessed=meta[field_names.index("last_accessed")],
is_materialized=meta[field_names.index("is_materialized")],
)
if table_meta.schema not in table_enrich:
table_enrich.setdefault(table_meta.schema, {})
Expand All @@ -173,42 +176,23 @@ def get_tables_and_views(
logger.info(f"Fetched {len(db_tables)} tables/views from Redshift")
for table in db_tables:
schema = table[field_names.index("schema")]
table_name = table[field_names.index("relname")]

if table[field_names.index("tabletype")] not in [
"MATERIALIZED VIEW",
"VIEW",
]:
if schema not in tables:
tables.setdefault(schema, [])
table_name = table[field_names.index("relname")]

creation_time: Optional[datetime] = None
if table[field_names.index("creation_time")]:
creation_time = table[field_names.index("creation_time")].replace(
tzinfo=timezone.utc
)

last_altered: Optional[datetime] = None
size_in_bytes: Optional[int] = None
rows_count: Optional[int] = None
if schema in enriched_table and table_name in enriched_table[schema]:
if enriched_table[schema][table_name].last_accessed:
# Mypy seems to be not clever enough to understand the above check
last_accessed = enriched_table[schema][table_name].last_accessed
assert last_accessed
last_altered = last_accessed.replace(tzinfo=timezone.utc)
elif creation_time:
last_altered = creation_time

if enriched_table[schema][table_name].size:
# Mypy seems to be not clever enough to understand the above check
size = enriched_table[schema][table_name].size
if size:
size_in_bytes = size * 1024 * 1024

if enriched_table[schema][table_name].estimated_visible_rows:
rows = enriched_table[schema][table_name].estimated_visible_rows
assert rows
rows_count = int(rows)
(
creation_time,
last_altered,
rows_count,
size_in_bytes,
) = RedshiftDataDictionary.get_table_stats(
enriched_table, field_names, schema, table
)

tables[schema].append(
RedshiftTable(
Expand All @@ -231,16 +215,37 @@ def get_tables_and_views(
else:
if schema not in views:
views[schema] = []
(
creation_time,
last_altered,
rows_count,
size_in_bytes,
) = RedshiftDataDictionary.get_table_stats(
enriched_table=enriched_table,
field_names=field_names,
schema=schema,
table=table,
)

materialized = False
if schema in enriched_table and table_name in enriched_table[schema]:
if enriched_table[schema][table_name].is_materialized:
materialized = True

views[schema].append(
RedshiftView(
type=table[field_names.index("tabletype")],
name=table[field_names.index("relname")],
ddl=table[field_names.index("view_definition")],
created=table[field_names.index("creation_time")],
created=creation_time,
comment=table[field_names.index("table_description")],
last_altered=last_altered,
size_in_bytes=size_in_bytes,
rows_count=rows_count,
materialized=materialized,
)
)

for schema_key, schema_tables in tables.items():
logger.info(
f"In schema: {schema_key} discovered {len(schema_tables)} tables"
Expand All @@ -250,6 +255,39 @@ def get_tables_and_views(

return tables, views

@staticmethod
def get_table_stats(enriched_table, field_names, schema, table):
table_name = table[field_names.index("relname")]

creation_time: Optional[datetime] = None
if table[field_names.index("creation_time")]:
creation_time = table[field_names.index("creation_time")].replace(
tzinfo=timezone.utc
)
last_altered: Optional[datetime] = None
size_in_bytes: Optional[int] = None
rows_count: Optional[int] = None
if schema in enriched_table and table_name in enriched_table[schema]:
if enriched_table[schema][table_name].last_accessed:
# Mypy seems to be not clever enough to understand the above check
last_accessed = enriched_table[schema][table_name].last_accessed
assert last_accessed
last_altered = last_accessed.replace(tzinfo=timezone.utc)
elif creation_time:
last_altered = creation_time

if enriched_table[schema][table_name].size:
# Mypy seems to be not clever enough to understand the above check
size = enriched_table[schema][table_name].size
if size:
size_in_bytes = size * 1024 * 1024

if enriched_table[schema][table_name].estimated_visible_rows:
rows = enriched_table[schema][table_name].estimated_visible_rows
assert rows
rows_count = int(rows)
return creation_time, last_altered, rows_count, size_in_bytes

@staticmethod
def get_schema_fields_for_column(
column: RedshiftColumn,
Expand Down
Loading