diff --git a/noxfile.py b/noxfile.py index 84f5f92c5..ba0e76b4f 100644 --- a/noxfile.py +++ b/noxfile.py @@ -92,6 +92,9 @@ def tests(session: Session) -> None: "-v", "--durations=10", *session.posargs, + env={ + "SQLALCHEMY_WARN_20": "1", + }, ) finally: if session.interactive: diff --git a/pyproject.toml b/pyproject.toml index f13d1613d..69cf5bbbb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -131,6 +131,9 @@ exclude = ".*simpleeval.*" [tool.pytest.ini_options] addopts = '-vvv --ignore=singer_sdk/helpers/_simpleeval.py -m "not external"' +filterwarnings = [ + "error::sqlalchemy.exc.RemovedIn20Warning", +] markers = [ "external: Tests relying on external resources", "windows: Tests that only run on Windows", diff --git a/samples/sample_tap_sqlite/__init__.py b/samples/sample_tap_sqlite/__init__.py index c006418c5..baf637322 100644 --- a/samples/sample_tap_sqlite/__init__.py +++ b/samples/sample_tap_sqlite/__init__.py @@ -4,8 +4,6 @@ from typing import Any -import sqlalchemy - from singer_sdk import SQLConnector, SQLStream, SQLTap from singer_sdk import typing as th @@ -22,21 +20,6 @@ def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: """Generates a SQLAlchemy URL for SQLite.""" return f"sqlite:///{config[DB_PATH_CONFIG]}" - def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: - """Return a new SQLAlchemy connection using the provided config. - - This override simply provides a more helpful error message on failure. - - Returns: - A newly created SQLAlchemy engine object. - """ - try: - return super().create_sqlalchemy_connection() - except Exception as ex: - raise RuntimeError( - f"Error connecting to DB at '{self.config[DB_PATH_CONFIG]}': {ex}" - ) from ex - class SQLiteStream(SQLStream): """The Stream class for SQLite. diff --git a/samples/sample_target_sqlite/__init__.py b/samples/sample_target_sqlite/__init__.py index f1b0ca66b..ea05a6213 100644 --- a/samples/sample_target_sqlite/__init__.py +++ b/samples/sample_target_sqlite/__init__.py @@ -4,8 +4,6 @@ from typing import Any -import sqlalchemy - from singer_sdk import SQLConnector, SQLSink, SQLTarget from singer_sdk import typing as th @@ -26,21 +24,6 @@ def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: """Generates a SQLAlchemy URL for SQLite.""" return f"sqlite:///{config[DB_PATH_CONFIG]}" - def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection: - """Return a new SQLAlchemy connection using the provided config. - - This override simply provides a more helpful error message on failure. - - Returns: - A newly created SQLAlchemy engine object. - """ - try: - return super().create_sqlalchemy_connection() - except Exception as ex: - raise RuntimeError( - f"Error connecting to DB at '{self.config[DB_PATH_CONFIG]}'" - ) from ex - class SQLiteSink(SQLSink): """The Sink class for SQLite. diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 437e97134..d33753e3e 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -683,7 +683,7 @@ def _create_empty_column( column_add_ddl = self.get_column_add_ddl( table_name=full_table_name, column_name=column_name, column_type=sql_type ) - with self._connect() as conn: + with self._connect() as conn, conn.begin(): conn.execute(column_add_ddl) def prepare_schema(self, schema_name: str) -> None: diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 88128364a..b0f220198 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -324,7 +324,8 @@ def bulk_insert_records( else (self.conform_record(record) for record in records) ) self.logger.info("Inserting with SQL: %s", insert_sql) - self.connector.connection.execute(insert_sql, conformed_records) + with self.connector._connect() as conn, conn.begin(): + conn.execute(insert_sql, conformed_records) return len(conformed_records) if isinstance(conformed_records, list) else None def merge_upsert_from_table( @@ -371,10 +372,13 @@ def activate_version(self, new_version: int) -> None: ) if self.config.get("hard_delete", True): - self.connection.execute( - f"DELETE FROM {self.full_table_name} " - f"WHERE {self.version_column_name} <= {new_version}" - ) + with self.connector._connect() as conn, conn.begin(): + conn.execute( + sqlalchemy.text( + f"DELETE FROM {self.full_table_name} " + f"WHERE {self.version_column_name} <= {new_version}" + ) + ) return if not self.connector.column_exists( @@ -397,7 +401,8 @@ def activate_version(self, new_version: int) -> None: bindparam("deletedate", value=deleted_at, type_=sqlalchemy.types.DateTime), bindparam("version", value=new_version, type_=sqlalchemy.types.Integer), ) - self.connector.connection.execute(query) + with self.connector._connect() as conn, conn.begin(): + conn.execute(query) __all__ = ["SQLSink", "SQLConnector"] diff --git a/singer_sdk/streams/sql.py b/singer_sdk/streams/sql.py index a2a9a9bf8..59272d01f 100644 --- a/singer_sdk/streams/sql.py +++ b/singer_sdk/streams/sql.py @@ -197,8 +197,9 @@ def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]: if self._MAX_RECORDS_LIMIT is not None: query = query.limit(self._MAX_RECORDS_LIMIT) - for record in self.connector.connection.execute(query): - yield dict(record) + with self.connector._connect() as conn: + for record in conn.execute(query): + yield dict(record._mapping) __all__ = ["SQLStream", "SQLConnector"] diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py index f7e798603..9b12db534 100644 --- a/tests/core/test_connector_sql.py +++ b/tests/core/test_connector_sql.py @@ -170,7 +170,7 @@ def test_connect_raises_on_operational_failure(self, connector): with pytest.raises( sqlalchemy.exc.OperationalError ) as _, connector._connect() as conn: - conn.execute("SELECT * FROM fake_table") + conn.execute(sqlalchemy.text("SELECT * FROM fake_table")) def test_rename_column_uses_connect_correctly(self, connector): attached_engine = connector._engine diff --git a/tests/samples/conftest.py b/tests/samples/conftest.py index 3b46bc939..21c612070 100644 --- a/tests/samples/conftest.py +++ b/tests/samples/conftest.py @@ -5,6 +5,7 @@ from pathlib import Path import pytest +from sqlalchemy import text from samples.sample_tap_sqlite import SQLiteConnector, SQLiteTap from singer_sdk._singerlib import Catalog @@ -20,15 +21,14 @@ def csv_config(outdir: str) -> dict: @pytest.fixture def _sqlite_sample_db(sqlite_connector): """Return a path to a newly constructed sample DB.""" - for t in range(3): - sqlite_connector.connection.execute(f"DROP TABLE IF EXISTS t{t}") - sqlite_connector.connection.execute( - f"CREATE TABLE t{t} (c1 int PRIMARY KEY, c2 varchar(10))" - ) - for x in range(100): - sqlite_connector.connection.execute( - f"INSERT INTO t{t} VALUES ({x}, 'x={x}')" + with sqlite_connector._connect() as conn, conn.begin(): + for t in range(3): + conn.execute(text(f"DROP TABLE IF EXISTS t{t}")) + conn.execute( + text(f"CREATE TABLE t{t} (c1 int PRIMARY KEY, c2 varchar(10))") ) + for x in range(100): + conn.execute(text(f"INSERT INTO t{t} VALUES ({x}, 'x={x}')")) @pytest.fixture diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 7e60674f1..a1f97eb35 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -47,10 +47,7 @@ def sqlite_sample_target(sqlite_target_test_config): @pytest.fixture def sqlite_sample_target_soft_delete(sqlite_target_test_config): """Get a sample target object with hard_delete disabled.""" - conf = sqlite_target_test_config - conf["hard_delete"] = False - - return SQLiteTarget(conf) + return SQLiteTarget({**sqlite_target_test_config, "hard_delete": False}) @pytest.fixture