Skip to content

Commit

Permalink
WIP Discovery working, need to get tables working, and then refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
visch committed Feb 16, 2024
1 parent d805bf5 commit a08e4b3
Showing 1 changed file with 157 additions and 0 deletions.
157 changes: 157 additions & 0 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
from __future__ import annotations

import datetime
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Iterable

import singer_sdk.helpers._typing
import sqlalchemy
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.helpers._typing import TypeConformanceLevel

if TYPE_CHECKING:
Expand Down Expand Up @@ -40,6 +42,30 @@ def patched_conform(

class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def create_engine(self) -> Engine:
"""Creates and returns a new engine. Do not call outside of _engine.
NOTE: Do not call this method. The only place that this method should
be called is inside the self._engine method. If you'd like to access
the engine on a connector, use self._engine.
This method exists solely so that tap/target developers can override it
on their subclass of SQLConnector to perform custom engine creation
logic.
Returns:
A new SQLAlchemy Engine.
"""
#pymyssql
connect_args = {
"ssl": {
"verify_identity": True,
"verify_cert": True,
"ca": "/etc/ssl/certs/ca-certificates.crt"
}
}
return sqlalchemy.create_engine(self.sqlalchemy_url, connect_args=connect_args, echo=False)

@staticmethod
def to_jsonschema_type(
Expand Down Expand Up @@ -169,6 +195,137 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
if "filter_schemas" in self.config and len(self.config["filter_schemas"]) != 0:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)

def discover_catalog_entry(
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
schema_name: str,
table_name: str,
is_view: bool, # noqa: FBT001
) -> CatalogEntry:
"""Overrode to support Vitess as DESCRIBE is not supported for views.
Create `CatalogEntry` object for the given table or a view.
Args:
engine: SQLAlchemy engine
inspected: SQLAlchemy inspector instance for engine
schema_name: Schema name to inspect
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`
Returns:
`CatalogEntry` object for the given table or a view
"""
# Initialize unique stream name
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
table_name=table_name,
delimiter="-",
)

# Detect key properties
possible_primary_keys: list[list[str]] = []
#pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
pk_def = False
if pk_def and "constrained_columns" in pk_def:
possible_primary_keys.append(pk_def["constrained_columns"])

# An element of the columns list is ``None`` if it's an expression and is
# returned in the ``expressions`` list of the reflected index.
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in [] #inspected.get_indexes(table_name, schema=schema_name)
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), None)

# Initialize columns list
table_schema = th.PropertiesList()
with self._connect() as conn:
columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`")
for column in columns:
column_name = column["Field"]
is_nullable = column["Null"] == "YES"
jsonschema_type: dict = self.to_jsonschema_type(column["Type"])
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
schema = table_schema.to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
# - 'INCREMENTAL' replication must be enabled by the user by specifying
# a replication_key value.
# - 'LOG_BASED' replication must be enabled by the developer, according
# to source-specific implementation capabilities.
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))

# Create the catalog entry object
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=key_properties,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=key_properties,
valid_replication_keys=None, # Must be defined by user
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None, # Must be defined by user
)

def get_table_columns(
self,
full_table_name: str,
column_names: list[str] | None = None,
) -> dict[str, sqlalchemy.Column]:
"""Overrode to support Vitess as DESCRIBE is not supported for views.
Return a list of table columns.
Args:
full_table_name: Fully qualified table name.
column_names: A list of column names to filter to.
Returns:
An ordered list of column objects.
"""
if full_table_name not in self._table_cols_cache:
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
with self._connect() as conn:
columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`")
breakpoint()

self._table_cols_cache[full_table_name] = {
col_meta["Field"]: sqlalchemy.Column(
col_meta["Field"],
col_meta["Type"],
nullable=col_meta["Null"] == "YES"
)
for col_meta in columns
if not column_names
or col_meta["Field"].casefold()
in {col.casefold() for col in column_names}
}

return self._table_cols_cache[full_table_name]


class MySQLStream(SQLStream):
Expand Down

0 comments on commit a08e4b3

Please sign in to comment.