Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: planet scale working #45

Merged
merged 26 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a08e4b3
WIP Discovery working, need to get tables working, and then refactor
visch Feb 16, 2024
7359da5
Working with access issues fitlered out
visch Feb 17, 2024
248be32
Works with 100k+ tables
visch Feb 17, 2024
1b155d9
Fix decimals
visch Feb 17, 2024
49f125a
Dynamic PlanetScale detection
visch Feb 17, 2024
a1a0580
Readme updated, planet scale working
visch Feb 17, 2024
f06bffd
Swap config name for sqlalchemy options
visch Feb 19, 2024
6c90cfb
Add is_vitess configuration
visch Feb 20, 2024
685a2e9
Merge main's poetry.lock file in
visch Feb 20, 2024
84fc4b3
Make linter happy
visch Feb 21, 2024
8c2d124
Fix config validation, and messup with connect function call
visch Feb 21, 2024
8809b65
Passes all tests, squashed some bugs!
visch Feb 21, 2024
7644319
make mypy happy
visch Feb 21, 2024
0b07d39
PlanetScale tap pointer
visch Feb 21, 2024
c84d384
fix merge conflicts
visch Feb 22, 2024
f7520c9
Fix sqlalchemy_options documentation
visch Feb 22, 2024
6c0e974
Fix vitess config check
visch Feb 22, 2024
fa8ac91
Update README.md
visch Feb 22, 2024
7e2a11b
Update README.md
visch Feb 22, 2024
3ac1ceb
Apply Edgars suggestions
visch Feb 22, 2024
d986bba
Merge branch 'planet_scale' of github.com:MeltanoLabs/tap-mysql into …
visch Feb 22, 2024
164b94f
Match tap.py config docs with README
visch Feb 22, 2024
6c40462
A bit cleaner is_vitess check
visch Feb 22, 2024
1b34310
config key doesn't get set when value is None. I didn't expect that!
visch Feb 22, 2024
563e229
Merge branch 'main' into planet_scale
edgarrmondragon Feb 23, 2024
76b0b11
Merge branch 'main' into planet_scale
visch Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ sudo apt-get install package-cfg libmysqlclient-dev
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified MySQL schemas and ignore others. If left blank, the tap automatically processes ALL available MySQL schemas. |
| is_vitess | False | None | By default we'll check if the database is a Vitess instance. If you'd rather not automatically check, set this to `False`. See Vitess/ PlanetScale documentation below for more information. |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified MySQL schemas and ignore others. If left blank, the tap automatically determines ALL available MySQL schemas. |
visch marked this conversation as resolved.
Show resolved Hide resolved
| sqlalchemy_options | False | None | This needs to be passed in as a JSON Object. sqlalchemy_url options (also called the query), to connect to PlanetScale you must turn on SSL. See the PlanetScale section below for details. Note: if `sqlalchemy_url` is set this will be ignored. |
| sqlalchemy_url | False | None | Example mysql://[username]:[password]@localhost:3306/[db_name] |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion host), see the other ssh_tunnel.* properties for more details.
Expand Down Expand Up @@ -125,6 +129,7 @@ After everything has been configured, be sure to indicate your use of an ssh tun

You can easily run `tap-mysql` by itself or in a pipeline using [Meltano](https://meltano.com/).


### Executing the Tap Directly

```bash
Expand All @@ -133,6 +138,42 @@ tap-mysql --help
tap-mysql --config CONFIG --discover > ./catalog.json
```

### PlanetScale(Vitess) Support
To get planetscale to work you need to use SSL.

config example in meltano.yml
```yaml
host: aws.connect.psdb.cloud
user: 01234fdsoi99
database: tap-mysql
sql_options:
ssl_ca: "/etc/ssl/certs/ca-certificates.crt"
ssl_verify_cert: "true"
ssl_verify_identity: "true"
```

Example select in meltano.yml (Which excludes tables that will fail)
```yaml
select:
- "*.*"
- "!information_schema-PROFILING.*"
- "!performance_schema-log_status.*"
```

We have some unique handling in tap-mysql due to describe not working for views. Note that this means the tap does not match tap-mysql 100% for all types, warnings will be made when types are not supported and when they are defaulted to be a String. Two example of this are enum, and set types.

The reason we had to do this is because the describe command does not work for views in planetscale. The core issue is shown by trying to run the sql command below

```sql
> describe information_schema.collations;
ERROR 1049 (42000): VT05003: unknown database 'information_schema' in vschema
```

#### PlanetScale Supported Tap
Note that PlanetScale has a singer tap that they support. It's located here https://github.com/planetscale/singer-tap/
It's written in Go, and it also supports Log Based replication.
This is a great alternative to this tap if you're using PlanetScale.

## Developer Resources

Follow these instructions to contribute to this project.
Expand Down
20 changes: 20 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,36 @@ plugins:
- name: user
- name: password
kind: password
sensitive: true
- name: database
- name: options
kind: object
- name: sqlalchemy_url
kind: password
sensitive: true
- name: ssh_tunnel.private_key
kind: password
sensitive: true
- name: ssh_tunnel.private_key_password
kind: password
sensitive: true
- name: ssh_tunnel.host
- name: ssh_tunnel.username
- name: ssh_tunnel.port
config:
host: aws.connect.psdb.cloud
user: 0fiqne6txvcqtjbdywan
database: tap-mysql
sqlalchemy_options:
ssl_ca: "/etc/ssl/certs/ca-certificates.crt"
ssl_verify_cert: "true"
ssl_verify_identity: "true"
select:
- "*.*"
- "!information_schema-PROFILING.*"
- "!mysql-time_zone.*"
- "!mysql-time_zone_transition.*"
- "!performance_schema-log_status.*"
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
34 changes: 34 additions & 0 deletions plugins/loaders/target-jsonl--andyh1203.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"plugin_type": "loaders",
"name": "target-jsonl",
"namespace": "target_jsonl",
"variant": "andyh1203",
"label": "JSON Lines (JSONL)",
"docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203",
"repo": "https://github.com/andyh1203/target-jsonl",
"pip_url": "target-jsonl",
"description": "JSONL loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png",
"settings": [
{
"name": "destination_path",
"kind": "string",
"value": "output",
"label": "Destination Path",
"description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n"
},
{
"name": "do_timestamp_file",
"kind": "boolean",
"value": false,
"label": "Include Timestamp in File Names",
"description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n"
},
{
"name": "custom_name",
"kind": "string",
"label": "Custom File Name Override",
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
220 changes: 213 additions & 7 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sqlalchemy
from singer_sdk import SQLConnector, SQLStream
from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.helpers._typing import TypeConformanceLevel

if TYPE_CHECKING:
Expand All @@ -23,9 +24,9 @@ def patched_conform(
elem: Any, # noqa: ANN401
property_schema: dict,
) -> Any: # noqa: ANN401
"""Override Singer SDK type conformance to prevent dates turning into datetimes.
"""Override type conformance to prevent dates turning into datetimes.

Converts a primitive (i.e. not object or array) to a json compatible type.
Converts a primitive to a json compatible type.

Returns:
The appropriate json compatible type.
Expand All @@ -41,6 +42,44 @@ def patched_conform(
class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the SQL connector.

This method initializes the SQL connector with the provided arguments.
It can accept variable-length arguments and keyword arguments to
customize the connection settings.

Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
self.is_vitess = self.config.get("is_vitess")

if self.is_vitess is None:
self.logger.info(
"No is_vitess configuration provided, dynamically checking if "
"we are using a Vitess instance."
)
with self._connect() as conn:
output = conn.execute(
"select variable_value from "
"performance_schema.global_variables where "
"variable_name='version_comment' and variable_value like "
"'PlanetScale%%'"
)
rows = output.fetchall()
if len(rows) > 0:
self.logger.info(
"Instance has been detected to be a "
"Vitess (PlanetScale) instance, using Vitess "
"configuration."
)
self.is_vitess = True
self.logger.info(
"Instance is not a Vitess instance, using standard configuration."
)

@staticmethod
def to_jsonschema_type(
sql_type: str # noqa: ANN401
Expand All @@ -52,15 +91,16 @@ def to_jsonschema_type(

Overridden from SQLConnector to correctly handle JSONB and Arrays.

By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy
types.
By default will call `typing.to_jsonschema_type()` for strings and
SQLAlchemy types.

Args:
sql_type: The string representation of the SQL type, a SQLAlchemy
TypeEngine class or object, or a custom-specified object.

Raises:
ValueError: If the type received could not be translated to jsonschema.
ValueError: If the type received could not be translated to
jsonschema.

Returns:
The JSON Schema representation of the provided type.
Expand Down Expand Up @@ -170,6 +210,167 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
return self.config["filter_schemas"]
return super().get_schema_names(engine, inspected)

def discover_catalog_entry( # noqa: PLR0913
self,
engine: Engine,
inspected: Inspector,
schema_name: str,
table_name: str,
is_view: bool, # noqa: FBT001
) -> CatalogEntry:
"""Overrode to support Vitess as DESCRIBE is not supported for views.

Create `CatalogEntry` object for the given table or a view.

Args:
engine: SQLAlchemy engine
inspected: SQLAlchemy inspector instance for engine
schema_name: Schema name to inspect
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`

Returns:
`CatalogEntry` object for the given table or a view
"""
if self.is_vitess is False or is_view is False:
return super().discover_catalog_entry(
engine, inspected, schema_name, table_name, is_view
)
# For vitess views, we can't use DESCRIBE as it's not supported for
# views so we do the below.
unique_stream_id = self.get_fully_qualified_name(
db_name=None,
schema_name=schema_name,
table_name=table_name,
delimiter="-",
)

# Initialize columns list
table_schema = th.PropertiesList()
with self._connect() as conn:
columns = conn.execute(f"SHOW columns from `{schema_name}`.`{table_name}`")
for column in columns:
column_name = column["Field"]
is_nullable = column["Null"] == "YES"
jsonschema_type: dict = self.to_jsonschema_type(column["Type"])
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
schema = table_schema.to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
# - 'INCREMENTAL' replication must be enabled by the user by specifying
# a replication_key value.
# - 'LOG_BASED' replication must be enabled by the developer, according
# to source-specific implementation capabilities.
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))

# Create the catalog entry object
return CatalogEntry(
tap_stream_id=unique_stream_id,
stream=unique_stream_id,
table=table_name,
key_properties=None,
schema=Schema.from_dict(schema),
is_view=is_view,
replication_method=replication_method,
metadata=MetadataMapping.get_standard_metadata(
schema_name=schema_name,
schema=schema,
replication_method=replication_method,
key_properties=None,
valid_replication_keys=None, # Must be defined by user
),
database=None, # Expects single-database context
row_count=None,
stream_alias=None,
replication_key=None, # Must be defined by user
)

def get_sqlalchemy_type(self, col_meta_type: str) -> sqlalchemy.Column:
"""Return a SQLAlchemy type object for the given SQL type.

Used ischema_names so we don't have to manually map all types.
"""
dialect = sqlalchemy.dialects.mysql.base.dialect() # type: ignore[attr-defined]
ischema_names = dialect.ischema_names
# Example varchar(97)
type_info = col_meta_type.split("(")
base_type_name = type_info[0].split(" ")[0] # bigint unsigned
type_args = (
type_info[1].split(" ")[0].rstrip(")") if len(type_info) > 1 else None
) # decimal(25,4) unsigned should work

if base_type_name in {"enum", "set"}:
self.logger.warning(
"Enum and Set types not supported for col_meta_type=%s. "
"Using varchar instead.",
col_meta_type,
)
base_type_name = "varchar"
type_args = None

type_class = ischema_names.get(base_type_name.lower())
visch marked this conversation as resolved.
Show resolved Hide resolved

try:
# Create an instance of the type class with parameters if they exist
if type_args:
return type_class(
*map(int, type_args.split(","))
) # Want to create a varchar(97) if asked for
return type_class()
except Exception:
self.logger.exception(
"Error creating sqlalchemy type for col_meta_type=%s", col_meta_type
)
raise

def get_table_columns(
self,
full_table_name: str,
column_names: list[str] | None = None,
) -> dict[str, sqlalchemy.Column]:
"""Overrode to support Vitess as DESCRIBE is not supported for views.

Return a list of table columns.

Args:
full_table_name: Fully qualified table name.
column_names: A list of column names to filter to.

Returns:
An ordered list of column objects.
"""
if self.is_vitess is False:
return super().get_table_columns(full_table_name, column_names)
# If Vitess Instance then we can't use DESCRIBE as it's not supported
# for views so we do below
if full_table_name not in self._table_cols_cache:
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
with self._connect() as conn:
columns = conn.execute(
f"SHOW columns from `{schema_name}`.`{table_name}`"
)
self._table_cols_cache[full_table_name] = {
col_meta["Field"]: sqlalchemy.Column(
col_meta["Field"],
self.get_sqlalchemy_type(col_meta["Type"]),
nullable=col_meta["Null"] == "YES",
)
for col_meta in columns
if not column_names
or col_meta["Field"].casefold()
in {col.casefold() for col in column_names}
}

return self._table_cols_cache[full_table_name]


class MySQLStream(SQLStream):
"""Stream class for MySQL streams."""
Expand Down Expand Up @@ -219,5 +420,10 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
if start_val:
query = query.filter(replication_key_col >= start_val)

for row in self.connector.connection.execute(query):
yield dict(row)
with self.connector._connect() as conn: # noqa: SLF001
visch marked this conversation as resolved.
Show resolved Hide resolved
if self.connector.is_vitess: # type: ignore[attr-defined]
conn.exec_driver_sql(
"set workload=olap"
) # See https://github.com/planetscale/discussion/discussions/190
for row in conn.execute(query):
yield dict(row)
Loading
Loading