From 05f5bd59bd6c1266574cfa52052e6a94e8bcf4b8 Mon Sep 17 00:00:00 2001 From: Andrew Olsen Date: Thu, 8 Apr 2021 09:43:42 +1200 Subject: [PATCH] Basic but working implementation of MySQL working copy --- kart/sqlalchemy/create_engine.py | 24 +++ kart/sqlalchemy/upsert.py | 33 +++ kart/working_copy/__init__.py | 11 +- kart/working_copy/db_server.py | 2 +- kart/working_copy/mysql.py | 327 +++++++++++++++++++++++++++++ kart/working_copy/mysql_adapter.py | 222 ++++++++++++++++++++ kart/working_copy/table_defs.py | 33 ++- tests/conftest.py | 53 ++++- tests/test_working_copy_mysql.py | 316 ++++++++++++++++++++++++++++ 9 files changed, 1017 insertions(+), 4 deletions(-) create mode 100644 kart/working_copy/mysql.py create mode 100644 kart/working_copy/mysql_adapter.py create mode 100644 tests/test_working_copy_mysql.py diff --git a/kart/sqlalchemy/create_engine.py b/kart/sqlalchemy/create_engine.py index b723da5c7..1e5fb54d0 100644 --- a/kart/sqlalchemy/create_engine.py +++ b/kart/sqlalchemy/create_engine.py @@ -119,6 +119,30 @@ def _on_checkout(dbapi_connection, connection_record, connection_proxy): return engine +CANONICAL_MYSQL_SCHEME = "mysql" +INTERNAL_MYSQL_SCHEME = "mysql+pymysql" + + +def mysql_engine(msurl): + def _on_connect(mysql_conn, connection_record): + dbcur = mysql_conn.cursor() + dbcur.execute("SET time_zone='UTC';") + dbcur.execute("SET sql_mode = 'ANSI_QUOTES';") + + url = urlsplit(msurl) + if url.scheme != CANONICAL_MYSQL_SCHEME: + raise ValueError("Expecting mysql://") + # url_query = _append_to_query( + # url.query, {"Application Name": "sno"} + # ) + msurl = urlunsplit([INTERNAL_MYSQL_SCHEME, url.netloc, url.path, url.query, ""]) + + engine = sqlalchemy.create_engine(msurl) + sqlalchemy.event.listen(engine, "connect", _on_connect) + + return engine + + CANONICAL_SQL_SERVER_SCHEME = "mssql" INTERNAL_SQL_SERVER_SCHEME = "mssql+pyodbc" SQL_SERVER_INSTALL_DOC_URL = ( diff --git a/kart/sqlalchemy/upsert.py b/kart/sqlalchemy/upsert.py index 372cd7b48..c0b3676bf 100644 --- a/kart/sqlalchemy/upsert.py +++ b/kart/sqlalchemy/upsert.py @@ -67,6 +67,39 @@ def compile_upsert_postgresql(upsert_stmt, compiler, **kwargs): return compiler.process(insert_stmt) +@compiles(Upsert, "mysql") +def compile_upsert_mysql(upsert_stmt, compiler, **kwargs): + # See https://dev.mysql.com/doc/refman/8.0/en/insert-on-duplicate.html + preparer = compiler.preparer + + def list_cols(col_names, prefix=""): + return ", ".join([prefix + c for c in col_names]) + + values = ", ".join(upsert_stmt.values(compiler)) + table = preparer.format_table(upsert_stmt.table) + all_columns = [preparer.quote(c.name) for c in upsert_stmt.columns] + non_pk_columns = [preparer.quote(c.name) for c in upsert_stmt.non_pk_columns] + + is_gte_version_8 = compiler.dialect.server_version_info[0] >= 8 + + if is_gte_version_8: + # Post 8.0 - don't use VALUES() again to refer to earlier VALUES. + # Instead, alias them. See https://dev.mysql.com/worklog/task/?id=13325 + result = f"INSERT INTO {table} ({list_cols(all_columns)}) " + result += f" VALUES ({values}) AS SOURCE ({list_cols(all_columns)})" + result += " ON DUPLICATE KEY UPDATE " + result += ", ".join([f"{c} = SOURCE.{c}" for c in non_pk_columns]) + + else: + # Pre 8.0 - reuse VALUES to refer to earlier VALUES. + result = f"INSERT INTO {table} ({list_cols(all_columns)}) " + result += f" VALUES ({values})" + result += " ON DUPLICATE KEY UPDATE " + result += ", ".join([f"{c} = VALUES({c})" for c in non_pk_columns]) # 5.7 + + return result + + @compiles(Upsert, "mssql") def compile_upsert_mssql(upsert_stmt, compiler, **kwargs): # See https://docs.microsoft.com/sql/t-sql/statements/merge-transact-sql diff --git a/kart/working_copy/__init__.py b/kart/working_copy/__init__.py index ed5b572b5..481962a90 100644 --- a/kart/working_copy/__init__.py +++ b/kart/working_copy/__init__.py @@ -9,6 +9,7 @@ class WorkingCopyType(Enum): GPKG = auto() POSTGIS = auto() SQL_SERVER = auto() + MYSQL = auto() @classmethod def from_location(cls, location, allow_invalid=False): @@ -17,6 +18,8 @@ def from_location(cls, location, allow_invalid=False): return WorkingCopyType.POSTGIS elif location.startswith("mssql:"): return WorkingCopyType.SQL_SERVER + elif location.startswith("mysql:"): + return WorkingCopyType.MYSQL elif location.lower().endswith(".gpkg"): return WorkingCopyType.GPKG elif allow_invalid: @@ -27,7 +30,8 @@ def from_location(cls, location, allow_invalid=False): "Try one of:\n" " PATH.gpkg\n" " postgresql://[HOST]/DBNAME/DBSCHEMA\n" - " mssql://[HOST]/DBNAME/DBSCHEMA" + " mssql://[HOST]/DBNAME/DBSCHEMA\n" + " mysql://[HOST]/DBNAME" ) @property @@ -44,6 +48,11 @@ def class_(self): from .sqlserver import WorkingCopy_SqlServer return WorkingCopy_SqlServer + elif self is WorkingCopyType.MYSQL: + from .mysql import WorkingCopy_MySql + + return WorkingCopy_MySql + raise RuntimeError("Invalid WorkingCopyType") diff --git a/kart/working_copy/db_server.py b/kart/working_copy/db_server.py index e3b8df08d..d518d3056 100644 --- a/kart/working_copy/db_server.py +++ b/kart/working_copy/db_server.py @@ -90,7 +90,7 @@ def _separate_db_schema(cls, db_uri, expected_path_length=2): """ Removes the DBSCHEMA part off the end of a URI's path, and returns the URI and the DBSCHEMA separately. Useful since generally, it is not necessary (or even possible) to connect to a particular DBSCHEMA directly, - instead, the rest of the URI is used to connect, then the DBSCHEMA is sped + instead, the rest of the URI is used to connect, then the DBSCHEMA is specified in every query / command. """ url = urlsplit(db_uri) url_path = PurePath(url.path) diff --git a/kart/working_copy/mysql.py b/kart/working_copy/mysql.py new file mode 100644 index 000000000..eb317d042 --- /dev/null +++ b/kart/working_copy/mysql.py @@ -0,0 +1,327 @@ +import contextlib +import logging +import time + + +from sqlalchemy.dialects.mysql.base import MySQLIdentifierPreparer +from sqlalchemy.sql.functions import Function +from sqlalchemy.orm import sessionmaker +from sqlalchemy.types import UserDefinedType + +from . import mysql_adapter +from .db_server import DatabaseServer_WorkingCopy +from .table_defs import MySqlKartTables +from kart import crs_util +from kart.geometry import Geometry +from kart.sqlalchemy import text_with_inlined_params +from kart.sqlalchemy.create_engine import mysql_engine + + +class WorkingCopy_MySql(DatabaseServer_WorkingCopy): + """ + MySQL working copy implementation. + + Requirements: + 1. The MySQL server needs to exist + 2. The database user needs to be able to: + - Create the specified database (unless it already exists). + - Create, delete and alter tables and triggers in the specified database. + """ + + WORKING_COPY_TYPE_NAME = "MySQL" + URI_SCHEME = "mysql" + + URI_FORMAT = "//HOST[:PORT]/DBNAME" + URI_VALID_PATH_LENGTHS = (1,) + INVALID_PATH_MESSAGE = "URI path must have one part - the database name" + + def __init__(self, repo, location): + """ + uri: connection string of the form mysql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] + """ + self.L = logging.getLogger(self.__class__.__qualname__) + + self.repo = repo + self.uri = self.location = location + + self.check_valid_db_uri(self.uri, repo) + self.db_uri, self.db_schema = self._separate_db_schema( + self.uri, expected_path_length=1 + ) + + self.engine = mysql_engine(self.db_uri) + self.sessionmaker = sessionmaker(bind=self.engine) + self.preparer = MySQLIdentifierPreparer(self.engine.dialect) + + self.kart_tables = MySqlKartTables(self.db_schema) + + def _create_table_for_dataset(self, sess, dataset): + table_spec = mysql_adapter.v2_schema_to_mysql_spec(dataset.schema, dataset) + sess.execute( + f"""CREATE TABLE IF NOT EXISTS {self.table_identifier(dataset)} ({table_spec});""" + ) + + def _type_def_for_column_schema(self, col, dataset): + if col.data_type == "geometry": + crs_name = col.extra_type_info.get("geometryCRS") + crs_id = crs_util.get_identifier_int_from_dataset(dataset, crs_name) or 0 + # This user-defined GeometryType adapts Kart's GPKG geometry to SQL Server's native geometry type. + return GeometryType(crs_id) + elif col.data_type == "timestamp": + return TimestampType + else: + # Don't need to specify type information for other columns at present, since we just pass through the values. + return None + + def _write_meta(self, sess, dataset): + """Write the title (as a comment) and the CRS. Other metadata is not stored in a PostGIS WC.""" + self._write_meta_title(sess, dataset) + self._write_meta_crs(sess, dataset) + + def _write_meta_title(self, sess, dataset): + """Write the dataset title as a comment on the table.""" + sess.execute( + f"ALTER TABLE {self.table_identifier(dataset)} COMMENT = :comment", + {"comment": dataset.get_meta_item("title")}, + ) + + def _write_meta_crs(self, sess, dataset): + """Populate the spatial_ref_sys table with data from this dataset.""" + # TODO - MYSQL-PART-2: Actually store CRS, if this is possible. + pass + + def delete_meta(self, dataset): + """Delete any metadata that is only needed by this dataset.""" + # TODO - MYSQL-PART-2: Delete any extra metadata that is not stored in the table itself. + pass + + def _create_spatial_index_post(self, sess, dataset): + # Only implemented as _create_spatial_index_post: + # It is more efficient to write the features first, then index them all in bulk. + + # TODO - MYSQL-PART-2 - We can only create a spatial index if the geometry column is declared + # not-null, but a datasets V2 schema doesn't distinguish between NULL and NOT NULL columns. + # So we don't know if the user would rather have an index, or be able to store NULL values. + return # Find a fix. + + L = logging.getLogger(f"{self.__class__.__qualname__}._create_spatial_index") + + geom_col = dataset.geom_column_name + + L.debug("Creating spatial index for %s.%s", dataset.table_name, geom_col) + t0 = time.monotonic() + + sess.execute( + f"ALTER TABLE {self.table_identifier(dataset)} ADD SPATIAL INDEX({self.quote(geom_col)})" + ) + + L.info("Created spatial index in %ss", time.monotonic() - t0) + + def _drop_spatial_index(self, sess, dataset): + # MySQL deletes the spatial index automatically when the table is deleted. + pass + + def _sno_tracking_name(self, trigger_type, dataset=None): + """Returns the sno-branded name of the trigger reponsible for populating the sno_track table.""" + assert dataset is None + return f"_sno_track_{trigger_type}" + + def _create_triggers(self, sess, dataset): + table_identifier = self.table_identifier(dataset) + pk_column = self.quote(dataset.primary_key) + + sess.execute( + text_with_inlined_params( + f""" + CREATE TRIGGER {self._quoted_tracking_name('ins', dataset)} + AFTER INSERT ON {table_identifier} + FOR EACH ROW + REPLACE INTO {self.KART_TRACK} (table_name, pk) + VALUES (:table_name, NEW.{pk_column}) + """, + {"table_name": dataset.table_name}, + ) + ) + sess.execute( + text_with_inlined_params( + f""" + CREATE TRIGGER {self._quoted_tracking_name('upd', dataset)} + AFTER UPDATE ON {table_identifier} + FOR EACH ROW + REPLACE INTO {self.KART_TRACK} (table_name, pk) + VALUES (:table_name1, OLD.{pk_column}), (:table_name2, NEW.{pk_column}) + """, + {"table_name1": dataset.table_name, "table_name2": dataset.table_name}, + ) + ) + sess.execute( + text_with_inlined_params( + f""" + CREATE TRIGGER {self._quoted_tracking_name('del', dataset)} + AFTER DELETE ON {table_identifier} + FOR EACH ROW + REPLACE INTO {self.KART_TRACK} (table_name, pk) + VALUES (:table_name, OLD.{pk_column}) + """, + {"table_name": dataset.table_name}, + ) + ) + + def _drop_triggers(self, sess, dataset): + sess.execute(f"DROP TRIGGER {self._quoted_tracking_name('ins', dataset)}") + sess.execute(f"DROP TRIGGER {self._quoted_tracking_name('upd', dataset)}") + sess.execute(f"DROP TRIGGER {self._quoted_tracking_name('del', dataset)}") + + @contextlib.contextmanager + def _suspend_triggers(self, sess, dataset): + self._drop_triggers(sess, dataset) + yield + self._create_triggers(sess, dataset) + + def meta_items(self, dataset): + with self.session() as sess: + table_info_sql = """ + SELECT + C.column_name, C.ordinal_position, C.data_type, C.srs_id, + C.character_maximum_length, C.numeric_precision, C.numeric_scale, + KCU.ordinal_position AS pk_ordinal_position + FROM information_schema.columns C + LEFT OUTER JOIN information_schema.key_column_usage KCU + ON (KCU.table_schema = C.table_schema) + AND (KCU.table_name = C.table_name) + AND (KCU.column_name = C.column_name) + WHERE C.table_schema=:table_schema AND C.table_name=:table_name + ORDER BY C.ordinal_position; + """ + r = sess.execute( + table_info_sql, + {"table_schema": self.db_schema, "table_name": dataset.table_name}, + ) + mysql_table_info = list(r) + + spatial_ref_sys_sql = """ + SELECT SRS.* FROM information_schema.st_spatial_reference_systems SRS + LEFT OUTER JOIN information_schema.st_geometry_columns GC ON (GC.srs_id = SRS.srs_id) + WHERE GC.table_schema=:table_schema AND GC.table_name=:table_name; + """ + r = sess.execute( + spatial_ref_sys_sql, + {"table_schema": self.db_schema, "table_name": dataset.table_name}, + ) + mysql_spatial_ref_sys = list(r) + + id_salt = f"{self.db_schema} {dataset.table_name} {self.get_db_tree()}" + schema = mysql_adapter.sqlserver_to_v2_schema( + mysql_table_info, mysql_spatial_ref_sys, id_salt + ) + yield "schema.json", schema.to_column_dicts() + + @classmethod + def try_align_schema_col(cls, old_col_dict, new_col_dict): + old_type = old_col_dict["dataType"] + new_type = new_col_dict["dataType"] + + # Some types have to be approximated as other types in MySQL + if mysql_adapter.APPROXIMATED_TYPES.get(old_type) == new_type: + new_col_dict["dataType"] = new_type = old_type + for key in mysql_adapter.APPROXIMATED_TYPES_EXTRA_TYPE_INFO: + new_col_dict[key] = old_col_dict.get(key) + + # Geometry types don't have to be approximated, except for the Z/M specifiers. + if old_type == "geometry" and new_type == "geometry": + old_gtype = old_col_dict.get("geometryType") + new_gtype = new_col_dict.get("geometryType") + if old_gtype and new_gtype and old_gtype != new_gtype: + if old_gtype.split(" ")[0] == new_gtype: + new_col_dict["geometryType"] = new_gtype = old_gtype + + return new_type == old_type + + _UNSUPPORTED_META_ITEMS = ( + "title", + "description", + "metadata/dataset.json", + "metadata.xml", + ) + + def _remove_hidden_meta_diffs(self, dataset, ds_meta_items, wc_meta_items): + super()._remove_hidden_meta_diffs(dataset, ds_meta_items, wc_meta_items) + + # Nowhere to put these in SQL Server WC + for key in self._UNSUPPORTED_META_ITEMS: + if key in ds_meta_items: + del ds_meta_items[key] + + # Diffing CRS is not yet supported. + for key in list(ds_meta_items.keys()): + if key.startswith("crs/"): + del ds_meta_items[key] + + def _is_meta_update_supported(self, dataset_version, meta_diff): + """ + Returns True if the given meta-diff is supported *without* dropping and rewriting the table. + (Any meta change is supported if we drop and rewrite the table, but of course it is less efficient). + meta_diff - DeltaDiff object containing the meta changes. + """ + # For now, just always drop and rewrite. + return not meta_diff + + +class GeometryType(UserDefinedType): + """UserDefinedType so that V2 geometry is adapted to MySQL binary format.""" + + # TODO: is "axis-order=long-lat" always the correct behaviour? It makes all the tests pass. + AXIS_ORDER = "axis-order=long-lat" + + def __init__(self, crs_id): + self.crs_id = crs_id + + def bind_processor(self, dialect): + # 1. Writing - Python layer - convert Kart geometry to WKB + return lambda geom: geom.to_wkb() + + def bind_expression(self, bindvalue): + # 2. Writing - SQL layer - wrap in call to ST_GeomFromWKB to convert WKB to MySQL binary. + return Function( + "ST_GeomFromWKB", bindvalue, self.crs_id, self.AXIS_ORDER, type_=self + ) + + def column_expression(self, col): + # 3. Reading - SQL layer - wrap in call to ST_AsBinary() to convert MySQL binary to WKB. + return Function("ST_AsBinary", col, self.AXIS_ORDER, type_=self) + + def result_processor(self, dialect, coltype): + # 4. Reading - Python layer - convert WKB to Kart geometry. + return lambda wkb: Geometry.from_wkb(wkb) + + +class DateType(UserDefinedType): + # UserDefinedType to read Dates as text. They are stored in MySQL as Dates but we read them back as text. + def column_expression(self, col): + # Reading - SQL layer - convert date to string in ISO8601. + # https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html + return Function("DATE_FORMAT", col, "%Y-%m-%d", type_=self) + + +class TimeType(UserDefinedType): + # UserDefinedType to read Times as text. They are stored in MySQL as Times but we read them back as text. + def column_expression(self, col): + # Reading - SQL layer - convert timestamp to string in ISO8601. + # https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html + return Function("DATE_FORMAT", col, "%H:%i:%S", type_=self) + + +class TimestampType(UserDefinedType): + """ + UserDefinedType to read Timestamps as text. They are stored in MySQL as Timestamps but we read them back as text. + """ + + def bind_processor(self, dialect): + # 1. Writing - Python layer - remove timezone specifier - MySQL can't read timezone specifiers. + # MySQL requires instead that the timezone is set in the database session (see create_engine.py) + return lambda timestamp: timestamp.rstrip("Z") + + def column_expression(self, col): + # 2. Reading - SQL layer - convert timestamp to string in ISO8601 with Z as the timezone specifier. + # https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html + return Function("DATE_FORMAT", col, "%Y-%m-%dT%H:%i:%SZ", type_=self) diff --git a/kart/working_copy/mysql_adapter.py b/kart/working_copy/mysql_adapter.py new file mode 100644 index 000000000..a38c7ab76 --- /dev/null +++ b/kart/working_copy/mysql_adapter.py @@ -0,0 +1,222 @@ +from kart import crs_util +from kart.schema import Schema, ColumnSchema + +from sqlalchemy.dialects.mysql.base import MySQLIdentifierPreparer, MySQLDialect + + +_PREPARER = MySQLIdentifierPreparer(MySQLDialect()) + + +V2_TYPE_TO_MYSQL_TYPE = { + "boolean": "bit(1)", + "blob": "longblob", + "date": "date", + "float": {0: "float", 32: "float", 64: "double precision"}, + "geometry": "geometry", + "integer": { + 0: "int", + 8: "tinyint", + 16: "smallint", + 32: "int", + 64: "bigint", + }, + "interval": "text", + "numeric": "numeric", + "text": "longtext", + "time": "time", + "timestamp": "timestamp", +} + + +MYSQL_TYPE_TO_V2_TYPE = { + "bit": "boolean", + "tinyint": ("integer", 8), + "smallint": ("integer", 16), + "int": ("integer", 32), + "bigint": ("integer", 64), + "float": ("float", 32), + "double": ("float", 64), + "double precision": ("float", 64), + "binary": "blob", + "blob": "blob", + "char": "text", + "date": "date", + "datetime": "timestamp", + "decimal": "numeric", + "geometry": "geometry", + "numeric": "numeric", + "text": "text", + "time": "time", + "timestamp": "timestamp", + "varchar": "text", + "varbinary": "blob", +} + +for prefix in ["tiny", "medium", "long"]: + MYSQL_TYPE_TO_V2_TYPE[f"{prefix}blob"] = "blob" + MYSQL_TYPE_TO_V2_TYPE[f"{prefix}text"] = "text" + + +# Types that can't be roundtripped perfectly in MySQL, and what they end up as. +APPROXIMATED_TYPES = {"interval": "text"} + +# Extra type info that might be missing/extra due to an approximated type. +APPROXIMATED_TYPES_EXTRA_TYPE_INFO = ("length",) + +MYSQL_GEOMETRY_TYPES = { + "GEOMETRY", + "POINT", + "LINESTRING", + "POLYGON", + "MULTIPOINT", + "MULTILINESTRING", + "MULTIPOLYGON", + "GEOMETRYCOLLECTION", +} + + +def quote(ident): + return _PREPARER.quote(ident) + + +def v2_schema_to_mysql_spec(schema, v2_obj): + """ + Generate the SQL CREATE TABLE spec from a V2 object eg: + 'fid INTEGER, geom POINT WITH CRSID 2136, desc VARCHAR(128), PRIMARY KEY(fid)' + """ + result = [_v2_column_schema_to_mysql_spec(col, v2_obj) for col in schema] + + if schema.pk_columns: + pk_col_names = ", ".join((quote(col.name) for col in schema.pk_columns)) + result.append(f"PRIMARY KEY({pk_col_names})") + + return ", ".join(result) + + +def _v2_column_schema_to_mysql_spec(column_schema, v2_obj): + name = column_schema.name + mysql_type = _v2_type_to_mysql_type(column_schema, v2_obj) + + return " ".join([quote(name), mysql_type]) + + +_MAX_SPECIFIABLE_LENGTH = 0xFFFF + + +def _v2_type_to_mysql_type(column_schema, v2_obj): + """Convert a v2 schema type to a MySQL type.""" + v2_type = column_schema.data_type + if v2_type == "geometry": + return _v2_geometry_type_to_mysql_type(column_schema, v2_obj) + + extra_type_info = column_schema.extra_type_info + + mysql_type_info = V2_TYPE_TO_MYSQL_TYPE.get(v2_type) + if mysql_type_info is None: + raise ValueError(f"Unrecognised data type: {v2_type}") + + if isinstance(mysql_type_info, dict): + return mysql_type_info.get(extra_type_info.get("size", 0)) + + mysql_type = mysql_type_info + + length = extra_type_info.get("length", None) + if length and length > 0 and length <= _MAX_SPECIFIABLE_LENGTH: + if mysql_type == "longtext": + return f"varchar({length})" + elif mysql_type == "longblob": + return f"varbinary({length})" + + if mysql_type == "numeric": + precision = extra_type_info.get("precision", None) + scale = extra_type_info.get("scale", None) + if precision is not None and scale is not None: + return f"numeric({precision},{scale})" + elif precision is not None: + return f"numeric({precision})" + else: + return "numeric" + + return mysql_type + + +def _v2_geometry_type_to_mysql_type(column_schema, v2_obj): + extra_type_info = column_schema.extra_type_info + mysql_type = extra_type_info.get("geometryType", "geometry").split(" ")[0] + + crs_name = extra_type_info.get("geometryCRS") + crs_id = crs_util.get_identifier_int_from_dataset(v2_obj, crs_name) + if crs_id is not None: + mysql_type += f" SRID {crs_id}" + + return mysql_type + + +def sqlserver_to_v2_schema(mysql_table_info, mysql_crs_info, id_salt): + """Generate a V2 schema from the given My SQL metadata.""" + return Schema( + [ + _mysql_to_column_schema(col, mysql_crs_info, id_salt) + for col in mysql_table_info + ] + ) + + +def _mysql_to_column_schema(mysql_col_info, mysql_crs_info, id_salt): + """ + Given the MySQL column info for a particular column, converts it to a ColumnSchema. + + Parameters: + mysql_col_info - info about a single column from mysql_table_info. + mysql_crs_info - info about all the CRS, in case this column is a geometry column. + id_salt - the UUIDs of the generated ColumnSchema are deterministic and depend on + the name and type of the column, and on this salt. + """ + name = mysql_col_info["COLUMN_NAME"] + pk_index = mysql_col_info["pk_ordinal_position"] + if pk_index is not None: + pk_index -= 1 + if mysql_col_info["DATA_TYPE"].upper() in MYSQL_GEOMETRY_TYPES: + data_type, extra_type_info = _mysql_type_to_v2_geometry_type( + mysql_col_info, mysql_crs_info + ) + else: + data_type, extra_type_info = _mysql_type_to_v2_type(mysql_col_info) + + col_id = ColumnSchema.deterministic_id(name, data_type, id_salt) + return ColumnSchema(col_id, name, data_type, pk_index, **extra_type_info) + + +def _mysql_type_to_v2_type(mysql_col_info): + v2_type_info = MYSQL_TYPE_TO_V2_TYPE.get(mysql_col_info["DATA_TYPE"]) + + if isinstance(v2_type_info, tuple): + v2_type = v2_type_info[0] + extra_type_info = {"size": v2_type_info[1]} + else: + v2_type = v2_type_info + extra_type_info = {} + + if v2_type in ("text", "blob"): + length = mysql_col_info["CHARACTER_MAXIMUM_LENGTH"] or None + if length is not None and length > 0 and length <= _MAX_SPECIFIABLE_LENGTH: + extra_type_info["length"] = length + + if v2_type == "numeric": + extra_type_info["precision"] = mysql_col_info["NUMERIC_PRECISION"] or None + extra_type_info["scale"] = mysql_col_info["NUMERIC_SCALE"] or None + + return v2_type, extra_type_info + + +def _mysql_type_to_v2_geometry_type(mysql_col_info, mysql_crs_info): + geometry_type = mysql_col_info["DATA_TYPE"].upper() + geometry_crs = None + + crs_id = mysql_col_info["SRS_ID"] + if crs_id: + crs_info = next((r for r in mysql_crs_info if r["SRS_ID"] == crs_id), None) + if crs_info: + geometry_crs = crs_util.get_identifier_str(crs_info["DEFINITION"]) + + return "geometry", {"geometryType": geometry_type, "geometryCRS": geometry_crs} diff --git a/kart/working_copy/table_defs.py b/kart/working_copy/table_defs.py index c28e1edcc..1db68fbe7 100644 --- a/kart/working_copy/table_defs.py +++ b/kart/working_copy/table_defs.py @@ -9,7 +9,7 @@ UniqueConstraint, ) -from sqlalchemy.types import NVARCHAR +from sqlalchemy.types import NVARCHAR, VARCHAR class TinyInt(Integer): @@ -94,6 +94,37 @@ class PostgisKartTables(AbstractKartTables): """Tables for Kart-specific metadata - PostGIS variant. Nothing special required.""" +class MySqlKartTables(AbstractKartTables): + """ + Tables for Kart-specific metadata - MySQL variant. + Primary keys have to be VARCHAR of a fixed maximum length - + if the total maximum length is too long, MySQL cannot generate an index. + """ + + def __init__(self, db_schema=None, is_kart_branding=False): + # Don't call super since we are redefining self.kart_state and self.kart_track. + self.sqlalchemy_metadata = MetaData() + self.db_schema = db_schema + self.is_kart_branding = is_kart_branding + + self.kart_state = Table( + self.kart_table_name(STATE), + self.sqlalchemy_metadata, + Column("table_name", VARCHAR(256), nullable=False, primary_key=True), + Column("key", VARCHAR(256), nullable=False, primary_key=True), + Column("value", Text, nullable=False), + schema=self.db_schema, + ) + + self.kart_track = Table( + self.kart_table_name(TRACK), + self.sqlalchemy_metadata, + Column("table_name", VARCHAR(256), nullable=False, primary_key=True), + Column("pk", VARCHAR(256), nullable=True, primary_key=True), + schema=self.db_schema, + ) + + class SqlServerKartTables(AbstractKartTables): """ Tables for kart-specific metadata - SQL Server variant. diff --git a/tests/conftest.py b/tests/conftest.py index 9fed94256..1ad5f0fed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,7 +24,12 @@ from kart.geometry import Geometry from kart.repo import KartRepo -from kart.sqlalchemy.create_engine import gpkg_engine, postgis_engine, sqlserver_engine +from kart.sqlalchemy.create_engine import ( + gpkg_engine, + postgis_engine, + sqlserver_engine, + mysql_engine, +) pytest_plugins = ["helpers_namespace"] @@ -995,3 +1000,49 @@ def _sqlserver_drop_schema_cascade(conn, db_schema): conn.execute(f"DROP TABLE IF EXISTS {table_identifiers};") conn.execute(f"DROP SCHEMA IF EXISTS {db_schema};") + + +@pytest.fixture() +def mysql_db(): + """ + Using docker, you can run a MySQL test - such as those in test_working_copy_mysql - as follows: + docker run -it --rm -d -p 13306:3306 -e MYSQL_ROOT_PASSWORD=PassWord1 mysql + KART_MYSQL_URL='mysql://root:PassWord1@localhost:13306' pytest -k mysql --pdb -vvs + """ + if "KART_MYSQL_URL" not in os.environ: + raise pytest.skip("Requires MySQL - read docstring at conftest.mysql_db") + engine = mysql_engine(os.environ["KART_MYSQL_URL"]) + with engine.connect() as conn: + # test connection: + try: + conn.execute("SELECT @@version;") + except sqlalchemy.exc.DBAPIError: + raise pytest.skip("Requires MySQL") + yield engine + + +@pytest.fixture() +def new_mysql_db_schema(request, mysql_db): + @contextlib.contextmanager + def ctx(create=False): + sha = hashlib.sha1(request.node.nodeid.encode("utf8")).hexdigest()[:20] + schema = f"kart_test_{sha}" + with mysql_db.connect() as conn: + # Start by deleting in case it is left over from last test-run... + conn.execute(f"""DROP SCHEMA IF EXISTS `{schema}`;""") + # Actually create only if create=True, otherwise the test will create it + if create: + conn.execute(f"""CREATE SCHEMA `{schema}`;""") + try: + url = urlsplit(os.environ["KART_MYSQL_URL"]) + url_path = url.path.rstrip("/") + "/" + schema + new_schema_url = urlunsplit( + [url.scheme, url.netloc, url_path, url.query, ""] + ) + yield new_schema_url, schema + finally: + # Clean up - delete it again if it exists. + with mysql_db.connect() as conn: + conn.execute(f"""DROP SCHEMA IF EXISTS `{schema}`;""") + + return ctx diff --git a/tests/test_working_copy_mysql.py b/tests/test_working_copy_mysql.py new file mode 100644 index 000000000..3b19faf2f --- /dev/null +++ b/tests/test_working_copy_mysql.py @@ -0,0 +1,316 @@ +import pytest + +import pygit2 + +from kart.repo import KartRepo +from kart.working_copy import sqlserver_adapter +from kart.working_copy.base import WorkingCopyStatus +from kart.working_copy.db_server import DatabaseServer_WorkingCopy +from test_working_copy import compute_approximated_types + + +H = pytest.helpers.helpers() + + +@pytest.mark.parametrize( + "existing_schema", + [ + pytest.param(True, id="existing-schema"), + pytest.param(False, id="brand-new-schema"), + ], +) +@pytest.mark.parametrize( + "archive,table,commit_sha", + [ + pytest.param("points", H.POINTS.LAYER, H.POINTS.HEAD_SHA, id="points"), + pytest.param("polygons", H.POLYGONS.LAYER, H.POLYGONS.HEAD_SHA, id="polygons"), + pytest.param("table", H.TABLE.LAYER, H.TABLE.HEAD_SHA, id="table"), + ], +) +def test_checkout_workingcopy( + archive, + table, + commit_sha, + existing_schema, + data_archive, + cli_runner, + new_mysql_db_schema, +): + """ Checkout a working copy """ + with data_archive(archive) as repo_path: + repo = KartRepo(repo_path) + H.clear_working_copy() + + with new_mysql_db_schema(create=existing_schema) as ( + mysql_url, + mysql_schema, + ): + r = cli_runner.invoke(["create-workingcopy", mysql_url]) + assert r.exit_code == 0, r.stderr + assert ( + r.stdout.splitlines()[-1] + == f"Creating working copy at {DatabaseServer_WorkingCopy.strip_password(mysql_url)} ..." + ) + + r = cli_runner.invoke(["status"]) + assert r.exit_code == 0, r.stderr + assert r.stdout.splitlines() == [ + "On branch main", + "", + "Nothing to commit, working copy clean", + ] + + wc = repo.working_copy + assert wc.status() & WorkingCopyStatus.INITIALISED + assert wc.status() & WorkingCopyStatus.HAS_DATA + + head_tree_id = repo.head_tree.hex + assert wc.assert_db_tree_match(head_tree_id) + + +@pytest.mark.parametrize( + "existing_schema", + [ + pytest.param(True, id="existing-schema"), + pytest.param(False, id="brand-new-schema"), + ], +) +def test_init_import( + existing_schema, + new_mysql_db_schema, + data_archive, + tmp_path, + cli_runner, +): + """ Import the GeoPackage (eg. `kx-foo-layer.gpkg`) into a Kart repository. """ + repo_path = tmp_path / "repo" + repo_path.mkdir() + + with data_archive("gpkg-points") as data: + with new_mysql_db_schema(create=existing_schema) as ( + mysql_url, + mysql_schema, + ): + r = cli_runner.invoke( + [ + "init", + "--import", + f"gpkg:{data / 'nz-pa-points-topo-150k.gpkg'}", + str(repo_path), + f"--workingcopy-path={mysql_url}", + ] + ) + assert r.exit_code == 0, r.stderr + assert (repo_path / ".kart" / "HEAD").exists() + + repo = KartRepo(repo_path) + wc = repo.working_copy + assert wc.status() & WorkingCopyStatus.INITIALISED + assert wc.status() & WorkingCopyStatus.HAS_DATA + + assert wc.location == mysql_url + + +@pytest.mark.parametrize( + "archive,table,commit_sha", + [ + pytest.param("points", H.POINTS.LAYER, H.POINTS.HEAD_SHA, id="points"), + pytest.param("polygons", H.POLYGONS.LAYER, H.POLYGONS.HEAD_SHA, id="polygons"), + pytest.param("table", H.TABLE.LAYER, H.TABLE.HEAD_SHA, id="table"), + ], +) +def test_commit_edits( + archive, + table, + commit_sha, + data_archive, + cli_runner, + new_mysql_db_schema, + edit_points, + edit_polygons, + edit_table, +): + """ Checkout a working copy and make some edits """ + with data_archive(archive) as repo_path: + repo = KartRepo(repo_path) + H.clear_working_copy() + + with new_mysql_db_schema() as (mysql_url, mysql_schema): + r = cli_runner.invoke(["create-workingcopy", mysql_url]) + assert r.exit_code == 0, r.stderr + + r = cli_runner.invoke(["status"]) + assert r.exit_code == 0, r.stderr + assert r.stdout.splitlines() == [ + "On branch main", + "", + "Nothing to commit, working copy clean", + ] + + wc = repo.working_copy + assert wc.status() & WorkingCopyStatus.INITIALISED + assert wc.status() & WorkingCopyStatus.HAS_DATA + + with wc.session() as sess: + if archive == "points": + edit_points(sess, repo.datasets()[H.POINTS.LAYER], wc) + elif archive == "polygons": + edit_polygons(sess, repo.datasets()[H.POLYGONS.LAYER], wc) + elif archive == "table": + edit_table(sess, repo.datasets()[H.TABLE.LAYER], wc) + + r = cli_runner.invoke(["status"]) + assert r.exit_code == 0, r.stderr + assert r.stdout.splitlines() == [ + "On branch main", + "", + "Changes in working copy:", + ' (use "kart commit" to commit)', + ' (use "kart reset" to discard changes)', + "", + f" {table}:", + " feature:", + " 1 inserts", + " 2 updates", + " 5 deletes", + ] + orig_head = repo.head.peel(pygit2.Commit).hex + + r = cli_runner.invoke(["commit", "-m", "test_commit"]) + assert r.exit_code == 0, r.stderr + + r = cli_runner.invoke(["status"]) + assert r.exit_code == 0, r.stderr + assert r.stdout.splitlines() == [ + "On branch main", + "", + "Nothing to commit, working copy clean", + ] + + new_head = repo.head.peel(pygit2.Commit).hex + assert new_head != orig_head + + r = cli_runner.invoke(["checkout", "HEAD^"]) + + assert repo.head.peel(pygit2.Commit).hex == orig_head + + +def test_edit_schema(data_archive, cli_runner, new_mysql_db_schema): + with data_archive("polygons") as repo_path: + repo = KartRepo(repo_path) + H.clear_working_copy() + + with new_mysql_db_schema() as (mysql_url, mysql_schema): + r = cli_runner.invoke(["create-workingcopy", mysql_url]) + assert r.exit_code == 0, r.stderr + + wc = repo.working_copy + assert wc.status() & WorkingCopyStatus.INITIALISED + assert wc.status() & WorkingCopyStatus.HAS_DATA + + r = cli_runner.invoke(["diff", "--output-format=quiet"]) + assert r.exit_code == 0, r.stderr + + with wc.session() as sess: + sess.execute( + f"""ALTER TABLE "{mysql_schema}"."{H.POLYGONS.LAYER}" ADD colour NVARCHAR(32);""" + ) + sess.execute( + f"""ALTER TABLE "{mysql_schema}"."{H.POLYGONS.LAYER}" DROP COLUMN survey_reference;""" + ) + + r = cli_runner.invoke(["diff"]) + assert r.exit_code == 0, r.stderr + diff = r.stdout.splitlines() + + # New column "colour" has an ID is deterministically generated from the commit hash, + # but we don't care exactly what it is. + try: + colour_id_line = diff[-6] + except KeyError: + colour_id_line = "" + + assert diff[-46:] == [ + "--- nz_waca_adjustments:meta:schema.json", + "+++ nz_waca_adjustments:meta:schema.json", + " [", + " {", + ' "id": "79d3c4ca-3abd-0a30-2045-45169357113c",', + ' "name": "id",', + ' "dataType": "integer",', + ' "primaryKeyIndex": 0,', + ' "size": 64', + " },", + " {", + ' "id": "c1d4dea1-c0ad-0255-7857-b5695e3ba2e9",', + ' "name": "geom",', + ' "dataType": "geometry",', + ' "geometryType": "MULTIPOLYGON",', + ' "geometryCRS": "EPSG:4167"', + " },", + " {", + ' "id": "d3d4b64b-d48e-4069-4bb5-dfa943d91e6b",', + ' "name": "date_adjusted",', + ' "dataType": "timestamp"', + " },", + "- {", + '- "id": "dff34196-229d-f0b5-7fd4-b14ecf835b2c",', + '- "name": "survey_reference",', + '- "dataType": "text",', + '- "length": 50', + "- },", + " {", + ' "id": "13dc4918-974e-978f-05ce-3b4321077c50",', + ' "name": "adjusted_nodes",', + ' "dataType": "integer",', + ' "size": 32', + " },", + "+ {", + colour_id_line, + '+ "name": "colour",', + '+ "dataType": "text",', + '+ "length": 32', + "+ },", + " ]", + ] + + orig_head = repo.head.peel(pygit2.Commit).hex + + r = cli_runner.invoke(["commit", "-m", "test_commit"]) + assert r.exit_code == 0, r.stderr + + r = cli_runner.invoke(["status"]) + assert r.exit_code == 0, r.stderr + assert r.stdout.splitlines() == [ + "On branch main", + "", + "Nothing to commit, working copy clean", + ] + + new_head = repo.head.peel(pygit2.Commit).hex + assert new_head != orig_head + + r = cli_runner.invoke(["checkout", "HEAD^"]) + + assert repo.head.peel(pygit2.Commit).hex == orig_head + + +def test_approximated_types(): + assert sqlserver_adapter.APPROXIMATED_TYPES == compute_approximated_types( + sqlserver_adapter.V2_TYPE_TO_MS_TYPE, sqlserver_adapter.MS_TYPE_TO_V2_TYPE + ) + + +def test_types_roundtrip(data_archive, cli_runner, new_mysql_db_schema): + with data_archive("types") as repo_path: + repo = KartRepo(repo_path) + H.clear_working_copy() + + with new_mysql_db_schema() as (mysql_url, mysql_schema): + repo.config["kart.workingcopy.location"] = mysql_url + r = cli_runner.invoke(["checkout"]) + + # If type-approximation roundtrip code isn't working, + # we would get spurious diffs on types that SQL server doesn't support. + r = cli_runner.invoke(["diff", "--exit-code"]) + assert r.exit_code == 0, r.stdout