Skip to content

Commit

Permalink
Basic but working implementation of MySQL working copy
Browse files Browse the repository at this point in the history
  • Loading branch information
olsen232 committed Apr 14, 2021
1 parent c7ce4b9 commit 0565783
Show file tree
Hide file tree
Showing 8 changed files with 1,016 additions and 3 deletions.
24 changes: 24 additions & 0 deletions sno/sqlalchemy/create_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,30 @@ def _on_connect(psycopg2_conn, connection_record):
return engine


CANONICAL_MYSQL_SCHEME = "mysql"
INTERNAL_MYSQL_SCHEME = "mysql+pymysql"


def mysql_engine(msurl):
def _on_connect(mysql_conn, connection_record):
dbcur = mysql_conn.cursor()
dbcur.execute("SET time_zone='UTC';")
dbcur.execute("SET sql_mode = 'ANSI_QUOTES';")

url = urlsplit(msurl)
if url.scheme != CANONICAL_MYSQL_SCHEME:
raise ValueError("Expecting mysql://")
# url_query = _append_to_query(
# url.query, {"Application Name": "sno"}
# )
msurl = urlunsplit([INTERNAL_MYSQL_SCHEME, url.netloc, url.path, url.query, ""])

engine = sqlalchemy.create_engine(msurl)
sqlalchemy.event.listen(engine, "connect", _on_connect)

return engine


CANONICAL_SQL_SERVER_SCHEME = "mssql"
INTERNAL_SQL_SERVER_SCHEME = "mssql+pyodbc"
SQL_SERVER_INSTALL_DOC_URL = (
Expand Down
33 changes: 33 additions & 0 deletions sno/sqlalchemy/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,39 @@ def compile_upsert_postgresql(upsert_stmt, compiler, **kwargs):
return compiler.process(insert_stmt)


@compiles(Upsert, "mysql")
def compile_upsert_mysql(upsert_stmt, compiler, **kwargs):
# See https://dev.mysql.com/doc/refman/8.0/en/insert-on-duplicate.html
preparer = compiler.preparer

def list_cols(col_names, prefix=""):
return ", ".join([prefix + c for c in col_names])

values = ", ".join(upsert_stmt.values(compiler))
table = preparer.format_table(upsert_stmt.table)
all_columns = [preparer.quote(c.name) for c in upsert_stmt.columns]
non_pk_columns = [preparer.quote(c.name) for c in upsert_stmt.non_pk_columns]

is_gte_version_8 = compiler.dialect.server_version_info[0] >= 8

if is_gte_version_8:
# Post 8.0 - don't use VALUES() again to refer to earlier VALUES.
# Instead, alias them. See https://dev.mysql.com/worklog/task/?id=13325
result = f"INSERT INTO {table} ({list_cols(all_columns)}) "
result += f" VALUES ({values}) AS SOURCE ({list_cols(all_columns)})"
result += " ON DUPLICATE KEY UPDATE "
result += ", ".join([f"{c} = SOURCE.{c}" for c in non_pk_columns])

else:
# Pre 8.0 - reuse VALUES to refer to earlier VALUES.
result = f"INSERT INTO {table} ({list_cols(all_columns)}) "
result += f" VALUES ({values})"
result += " ON DUPLICATE KEY UPDATE "
result += ", ".join([f"{c} = VALUES({c})" for c in non_pk_columns]) # 5.7

return result


@compiles(Upsert, "mssql")
def compile_upsert_mssql(upsert_stmt, compiler, **kwargs):
# See https://docs.microsoft.com/sql/t-sql/statements/merge-transact-sql
Expand Down
11 changes: 10 additions & 1 deletion sno/working_copy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class WorkingCopyType(Enum):
GPKG = auto()
POSTGIS = auto()
SQL_SERVER = auto()
MYSQL = auto()

@classmethod
def from_location(cls, location, allow_invalid=False):
Expand All @@ -17,6 +18,8 @@ def from_location(cls, location, allow_invalid=False):
return WorkingCopyType.POSTGIS
elif location.startswith("mssql:"):
return WorkingCopyType.SQL_SERVER
elif location.startswith("mysql:"):
return WorkingCopyType.MYSQL
elif location.lower().endswith(".gpkg"):
return WorkingCopyType.GPKG
elif allow_invalid:
Expand All @@ -27,7 +30,8 @@ def from_location(cls, location, allow_invalid=False):
"Try one of:\n"
" PATH.gpkg\n"
" postgresql://[HOST]/DBNAME/DBSCHEMA\n"
" mssql://[HOST]/DBNAME/DBSCHEMA"
" mssql://[HOST]/DBNAME/DBSCHEMA\n"
" mysql://[HOST]/DBNAME"
)

@property
Expand All @@ -44,6 +48,11 @@ def class_(self):
from .sqlserver import WorkingCopy_SqlServer

return WorkingCopy_SqlServer
elif self is WorkingCopyType.MYSQL:
from .mysql import WorkingCopy_MySql

return WorkingCopy_MySql

raise RuntimeError("Invalid WorkingCopyType")


Expand Down
Loading

0 comments on commit 0565783

Please sign in to comment.