From 1e334af89cfbe318474103347623e141e591684c Mon Sep 17 00:00:00 2001
From: Pere Miquel Brull
Date: Wed, 29 Dec 2021 17:33:40 +0100
Subject: [PATCH] Fix linting (#1958)
---
.pylintrc | 6 +-
.../metadata/ingestion/source/sql_source.py | 284 +++++++++++-------
2 files changed, 175 insertions(+), 115 deletions(-)
diff --git a/.pylintrc b/.pylintrc
index 93c953b43640..2e99493ce45b 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -1,6 +1,10 @@
[BASIC]
# W1203: logging-fstring-interpolation - f-string brings better readability and unifies style
-disable=W1203
+# W1202: logging-format-interpolation - lazy formatting in logging functions
+disable=W1203,W1202
+docstring-min-length=20
+max-args=7
+max-attributes=12
[MASTER]
fail-under=6.0
diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py
index 236a8fc6bffe..184004b44e1c 100644
--- a/ingestion/src/metadata/ingestion/source/sql_source.py
+++ b/ingestion/src/metadata/ingestion/source/sql_source.py
@@ -8,6 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+"""
+Generic source to build SQL connectors.
+"""
import json
import logging
import re
@@ -16,7 +19,7 @@
from abc import abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
-from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type
+from typing import Dict, Iterable, List, Optional, Tuple
from urllib.parse import quote_plus
from pydantic import SecretStr
@@ -54,20 +57,22 @@
@dataclass
class SQLSourceStatus(SourceStatus):
+ """
+ Reports the source status after ingestion
+ """
+
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
- def scanned(self, table_name: str) -> None:
- self.success.append(table_name)
- logger.info("Table Scanned: {}".format(table_name))
+ def scanned(self, record: str) -> None:
+ self.success.append(record)
+ logger.info(f"Table Scanned: {record}")
- def filter(
- self, table_name: str, err: str, dataset_name: str = None, col_type: str = None
- ) -> None:
- self.filtered.append(table_name)
- logger.warning("Dropped Table {} due to {}".format(table_name, err))
+ def filter(self, record: str, err: str) -> None:
+ self.filtered.append(record)
+ logger.warning(f"Dropped Table {record} due to {err}")
def build_sql_source_connection_url(
@@ -76,8 +81,11 @@ def build_sql_source_connection_url(
username: Optional[str] = None,
password: Optional[SecretStr] = None,
database: Optional[str] = None,
- options: dict = {},
+ options: Optional[dict] = None,
) -> str:
+ """
+ Helper function to prepare the db URL
+ """
url = f"{scheme}://"
if username is not None:
@@ -100,6 +108,12 @@ def build_sql_source_connection_url(
class SQLConnectionConfig(ConfigModel):
+ """
+ Config class containing all supported
+ configurations for an SQL source, including
+ data profiling and DBT generated information.
+ """
+
username: Optional[str] = None
password: Optional[SecretStr] = None
host_port: str
@@ -144,7 +158,8 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str
description = None
try:
table_info: dict = inspector.get_table_comment(table, schema)
- except Exception as err:
+ # Catch any exception without breaking the ingestion
+ except Exception as err: # pylint: disable=broad-except
logger.error(f"Table Description Error : {err}")
else:
description = table_info["text"]
@@ -152,6 +167,12 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str
class SQLSource(Source[OMetaDatabaseAndTable]):
+ """
+ Source Connector implementation to extract
+ Database & Table information and convert it
+ to OpenMetadata Entities
+ """
+
def __init__(
self,
config: SQLConnectionConfig,
@@ -174,26 +195,38 @@ def __init__(
self.data_profiler = None
self.data_models = {}
if self.config.dbt_catalog_file is not None:
- self.dbt_catalog = json.load(open(self.config.dbt_catalog_file, "r"))
+ with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog:
+ self.dbt_catalog = json.load(catalog)
if self.config.dbt_manifest_file is not None:
- self.dbt_manifest = json.load(open(self.config.dbt_manifest_file, "r"))
+ with open(self.config.dbt_manifest_file, "r", encoding="utf-8") as manifest:
+ self.dbt_manifest = json.load(manifest)
- def _instantiate_profiler(self):
+ def _instantiate_profiler(self) -> bool:
+ """
+ If the profiler is configured, load it and run.
+
+ Return True if the profiling ran correctly
+ """
try:
if self.config.data_profiler_enabled:
if self.data_profiler is None:
+ # pylint: disable=import-outside-toplevel
from metadata.profiler.dataprofiler import DataProfiler
+ # pylint: enable=import-outside-toplevel
+
self.data_profiler = DataProfiler(
status=self.status, connection_str=self.connection_string
)
return True
- return False
- except Exception:
+ # Catch any errors during profiling init and continue ingestion
+ except Exception as exc: # pylint: disable=broad-except
logger.error(
+ f"Error loading profiler {exc}"
"DataProfiler configuration is enabled. Please make sure you ran "
"pip install 'openmetadata-ingestion[data-profiler]'"
)
+ return False
def prepare(self):
self._parse_data_model()
@@ -204,15 +237,15 @@ def create(
):
pass
- def type_of_column_name(self, sa_type, table_name: str, column_name: str):
- return sa_type
-
- def standardize_schema_table_names(
- self, schema: str, table: str
- ) -> Tuple[str, str]:
+ @staticmethod
+ def standardize_schema_table_names(schema: str, table: str) -> Tuple[str, str]:
return schema, table
- def fetch_sample_data(self, schema: str, table: str):
+ def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:
+ """
+ Get some sample data from the source to be added
+ to the Table Entities
+ """
try:
query = self.config.query.format(schema, table)
logger.info(query)
@@ -221,14 +254,14 @@ def fetch_sample_data(self, schema: str, table: str):
for col in results.keys():
cols.append(col.replace(".", "_DOT_"))
rows = []
- for r in results:
- row = list(r)
+ for res in results:
+ row = list(res)
rows.append(row)
return TableData(columns=cols, rows=rows)
- except Exception as err:
- logger.error(
- "Failed to generate sample data for {} - {}".format(table, err)
- )
+ # Catch any errors and continue the ingestion
+ except Exception as err: # pylint: disable=broad-except
+ logger.error(f"Failed to generate sample data for {table} - {err}")
+ return None
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
inspector = inspect(self.engine)
@@ -236,7 +269,7 @@ def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
if not self.sql_config.schema_filter_pattern.included(schema):
self.status.filter(schema, "Schema pattern not allowed")
continue
- logger.debug("total tables {}".format(inspector.get_table_names(schema)))
+ logger.debug(f"Total tables {inspector.get_table_names(schema)}")
if self.config.include_tables:
yield from self.fetch_tables(inspector, schema)
if self.config.include_views:
@@ -245,6 +278,10 @@ def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
def fetch_tables(
self, inspector: Inspector, schema: str
) -> Iterable[OMetaDatabaseAndTable]:
+ """
+ Scrape an SQL schema and prepare Database and Table
+ OpenMetadata Entities
+ """
for table_name in inspector.get_table_names(schema):
try:
schema, table_name = self.standardize_schema_table_names(
@@ -252,13 +289,11 @@ def fetch_tables(
)
if not self.sql_config.table_filter_pattern.included(table_name):
self.status.filter(
- "{}.{}".format(self.config.get_service_name(), table_name),
+ f"{self.config.get_service_name()}.{table_name}",
"Table pattern not allowed",
)
continue
- self.status.scanned(
- "{}.{}".format(self.config.get_service_name(), table_name)
- )
+ self.status.scanned(f"{self.config.get_service_name()}.{table_name}")
description = _get_table_description(schema, table_name, inspector)
fqn = f"{self.config.service_name}.{self.config.database}.{schema}.{table_name}"
@@ -275,7 +310,8 @@ def fetch_tables(
if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name)
table_entity.sampleData = table_data
- except Exception as err:
+ # Catch any errors during the ingestion and continue
+ except Exception as err: # pylint: disable=broad-except
logger.error(repr(err))
logger.error(err)
@@ -291,16 +327,19 @@ def fetch_tables(
table=table_entity, database=self._get_database(schema)
)
yield table_and_db
- except Exception as err:
+ # Catch any errors during the ingestion and continue
+ except Exception as err: # pylint: disable=broad-except
logger.error(err)
- self.status.warnings.append(
- "{}.{}".format(self.config.service_name, table_name)
- )
+ self.status.warnings.append(f"{self.config.service_name}.{table_name}")
continue
def fetch_views(
self, inspector: Inspector, schema: str
) -> Iterable[OMetaDatabaseAndTable]:
+ """
+ Get all views in the SQL schema and prepare
+ Database & Table OpenMetadata Entities
+ """
for view_name in inspector.get_view_names(schema):
try:
if self.config.scheme == "bigquery":
@@ -309,12 +348,11 @@ def fetch_views(
)
if not self.sql_config.table_filter_pattern.included(view_name):
self.status.filter(
- "{}.{}".format(self.config.get_service_name(), view_name),
+ f"{self.config.get_service_name()}.{view_name}",
"View pattern not allowed",
)
continue
try:
-
if self.config.scheme == "bigquery":
view_definition = inspector.get_view_definition(
f"{self.config.project_id}.{schema}.{view_name}"
@@ -329,17 +367,15 @@ def fetch_views(
except NotImplementedError:
view_definition = ""
- description = _get_table_description(schema, view_name, inspector)
- table_columns = self._get_columns(schema, view_name, inspector)
- view_name = view_name.replace(".", "_DOT_")
- fqn = f"{self.config.service_name}.{self.config.database}.{schema}.{view_name}"
table = Table(
id=uuid.uuid4(),
- name=view_name,
+ name=view_name.replace(".", "_DOT_"),
tableType="View",
- description=description if description is not None else " ",
- fullyQualifiedName=fqn,
- columns=table_columns,
+ description=_get_table_description(schema, view_name, inspector)
+ or "",
+ # This will be generated in the backend!! #1673
+ fullyQualifiedName=view_name,
+ columns=self._get_columns(schema, view_name, inspector),
viewDefinition=view_definition,
)
if self.sql_config.generate_sample_data:
@@ -350,31 +386,34 @@ def fetch_views(
table=table, database=self._get_database(schema)
)
yield table_and_db
- except Exception as err:
+ # Catch any errors and continue the ingestion
+ except Exception as err: # pylint: disable=broad-except
logger.error(err)
- self.status.warnings.append(
- "{}.{}".format(self.config.service_name, view_name)
- )
+ self.status.warnings.append(f"{self.config.service_name}.{view_name}")
continue
def _parse_data_model(self):
-
+ """
+ Get all the DBT information and feed it to the Table Entity
+ """
if self.config.dbt_manifest_file and self.config.dbt_catalog_file:
logger.info("Parsing Data Models")
- manifest_nodes = self.dbt_manifest["nodes"]
- manifest_sources = self.dbt_manifest["sources"]
- manifest_entities = {**manifest_nodes, **manifest_sources}
- catalog_nodes = self.dbt_catalog["nodes"]
- catalog_sources = self.dbt_catalog["sources"]
- catalog_entities = {**catalog_nodes, **catalog_sources}
+ manifest_entities = {
+ **self.dbt_manifest["nodes"],
+ **self.dbt_manifest["sources"],
+ }
+ catalog_entities = {
+ **self.dbt_catalog["nodes"],
+ **self.dbt_catalog["sources"],
+ }
for key, mnode in manifest_entities.items():
name = mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
cnode = catalog_entities.get(key)
- if cnode is not None:
- columns = self._parse_data_model_columns(name, mnode, cnode)
- else:
- columns = []
+ columns = (
+ self._parse_data_model_columns(name, mnode, cnode) if cnode else []
+ )
+
if mnode["resource_type"] == "test":
continue
upstream_nodes = self._parse_data_model_upstream(mnode)
@@ -382,14 +421,12 @@ def _parse_data_model(self):
mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
)
model_name = model_name.replace(".", "_DOT_")
- description = mnode.get("description", "")
schema = mnode["schema"]
- path = f"{mnode['root_path']}/{mnode['original_file_path']}"
raw_sql = mnode.get("raw_sql", "")
model = DataModel(
modelType=ModelType.DBT,
- description=description,
- path=path,
+ description=mnode.get("description", ""),
+ path=f"{mnode['root_path']}/{mnode['original_file_path']}",
rawSql=raw_sql,
sql=mnode.get("compiled_sql", raw_sql),
columns=columns,
@@ -403,12 +440,14 @@ def _parse_data_model_upstream(self, mnode):
if "depends_on" in mnode and "nodes" in mnode["depends_on"]:
for node in mnode["depends_on"]["nodes"]:
try:
- node_type, database, table = node.split(".", 2)
+ _, database, table = node.split(".", 2)
table = table.replace(".", "_DOT_")
table_fqn = f"{self.config.service_name}.{database}.{table}"
upstream_nodes.append(table_fqn)
- except Exception:
- logger.error(f"Failed to parse the node {node} to capture lineage")
+ except Exception as err: # pylint: disable=broad-except
+ logger.error(
+ f"Failed to parse the node {node} to capture lineage {err}"
+ )
continue
return upstream_nodes
@@ -443,8 +482,8 @@ def _parse_data_model_columns(
ordinalPosition=ccolumn["index"],
)
columns.append(col)
- except Exception as e:
- logger.error(f"Failed to parse column type due to {e}")
+ except Exception as err: # pylint: disable=broad-except
+ logger.error(f"Failed to parse column type due to {err}")
return columns
@@ -454,33 +493,58 @@ def _get_database(self, schema: str) -> Database:
service=EntityReference(id=self.service.id, type=self.config.service_type),
)
- def parse_raw_data_type(self, raw_data_type):
- return raw_data_type
+ @staticmethod
+ def _get_column_constraints(
+ column, pk_columns, unique_columns
+ ) -> Optional[Constraint]:
+ """
+ Prepare column constraints for the Table Entity
+ """
+ constraint = None
+
+ if column["nullable"]:
+ constraint = Constraint.NULL
+ elif not column["nullable"]:
+ constraint = Constraint.NOT_NULL
+
+ if column["name"] in pk_columns:
+ constraint = Constraint.PRIMARY_KEY
+ elif column["name"] in unique_columns:
+ constraint = Constraint.UNIQUE
+
+ return constraint
def _get_columns(
self, schema: str, table: str, inspector: Inspector
- ) -> List[Column]:
+ ) -> Optional[List[Column]]:
+ """
+ Get columns types and constraints information
+ """
+
+ # Get inspector information:
pk_constraints = inspector.get_pk_constraint(table, schema)
+ try:
+ unique_constraints = inspector.get_unique_constraints(table, schema)
+ except NotImplementedError:
+ logger.warning("Cannot obtain constraints - NotImplementedError")
+ unique_constraints = []
+
pk_columns = (
pk_constraints["column_constraints"]
if len(pk_constraints) > 0 and "column_constraints" in pk_constraints.keys()
else {}
)
- unique_constraints = []
- try:
- unique_constraints = inspector.get_unique_constraints(table, schema)
- except NotImplementedError:
- pass
- unique_columns = []
- for constraint in unique_constraints:
- if "column_names" in constraint.keys():
- unique_columns = constraint["column_names"]
+
+ unique_columns = [
+ constraint["column_names"]
+ for constraint in unique_constraints
+ if "column_names" in constraint.keys()
+ ]
+
dataset_name = f"{schema}.{table}"
- columns = inspector.get_columns(table, schema)
table_columns = []
- row_order = 1
try:
- for column in columns:
+ for row_order, column in enumerate(inspector.get_columns(table, schema)):
if "." in column["name"]:
logger.info(f"Found '.' in {column['name']}")
column["name"] = column["name"].replace(".", "_DOT_")
@@ -489,9 +553,6 @@ def _get_columns(
col_data_length = None
arr_data_type = None
if "raw_data_type" in column and column["raw_data_type"] is not None:
- column["raw_data_type"] = self.parse_raw_data_type(
- column["raw_data_type"]
- )
(
col_type,
data_type_display,
@@ -514,26 +575,16 @@ def _get_columns(
r"(?:\w*)(?:[(]*)(\w*)(?:.*)", str(column["type"])
).groups()
data_type_display = column["type"]
- col_constraint = None
- if column["nullable"]:
- col_constraint = Constraint.NULL
- elif not column["nullable"]:
- col_constraint = Constraint.NOT_NULL
- if column["name"] in pk_columns:
- col_constraint = Constraint.PRIMARY_KEY
- elif column["name"] in unique_columns:
- col_constraint = Constraint.UNIQUE
- if col_type.upper() in ["CHAR", "VARCHAR", "BINARY", "VARBINARY"]:
+
+ col_constraint = self._get_column_constraints(
+ column, pk_columns, unique_columns
+ )
+
+ if col_type.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}:
col_data_length = column["type"].length
if col_data_length is None:
col_data_length = 1
try:
- if col_type == "NULL":
- col_type = self.type_of_column_name(
- col_type,
- column_name=column["name"],
- table_name=dataset_name,
- )
if col_type == "NULL":
col_type = "VARCHAR"
data_type_display = "varchar"
@@ -544,27 +595,32 @@ def _get_columns(
name=column["name"],
description=column.get("comment", None),
dataType=col_type,
- dataTypeDisplay="{}({})".format(col_type, col_data_length)
+ dataTypeDisplay=f"{col_type}({col_data_length})"
if data_type_display is None
else f"{data_type_display}",
dataLength=col_data_length,
constraint=col_constraint,
- ordinalPosition=row_order,
- children=children if children is not None else None,
+ ordinalPosition=row_order + 1, # enumerate starts at 0
+ children=children,
arrayDataType=arr_data_type,
)
- except Exception as err:
+ except Exception as err: # pylint: disable=broad-except
logger.error(traceback.format_exc())
logger.error(traceback.print_exc())
logger.error(f"{err} : {column}")
continue
table_columns.append(om_column)
- row_order = row_order + 1
return table_columns
- except Exception as err:
- logger.error("{}: {} {}".format(repr(err), table, err))
+ except Exception as err: # pylint: disable=broad-except
+ logger.error(f"{repr(err)}: {table} {err}")
+ return None
def run_data_profiler(self, table: str, schema: str) -> TableProfile:
+ """
+ Run the profiler for a table in a schema.
+
+ Prepare specific namings for different sources, e.g. bigquery
+ """
dataset_name = f"{schema}.{table}"
self.status.scanned(f"profile of {dataset_name}")
logger.info(