From 0ec04f77ed2a5bfe1a557702271d83cd744b449a Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 1 Oct 2024 16:19:49 -0700 Subject: [PATCH 1/5] add 'execute_sql' command on caches --- airbyte/_processors/sql/duckdb.py | 17 ++++++++++++++++- airbyte/caches/base.py | 25 +++++++++++++++++++++++++ airbyte/shared/sql_processor.py | 13 ++++++++++++- 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/airbyte/_processors/sql/duckdb.py b/airbyte/_processors/sql/duckdb.py index 35162f63..27e5efb4 100644 --- a/airbyte/_processors/sql/duckdb.py +++ b/airbyte/_processors/sql/duckdb.py @@ -11,6 +11,7 @@ from duckdb_engine import DuckDBEngineWarning from overrides import overrides from pydantic import Field +from sqlalchemy import text from airbyte._writers.jsonl import JsonlWriter from airbyte.secrets.base import SecretString @@ -19,7 +20,7 @@ if TYPE_CHECKING: - from sqlalchemy.engine import Engine + from sqlalchemy.engine import Connection, Engine # @dataclass @@ -161,3 +162,17 @@ def _write_files_to_new_table( ) self._execute_sql(insert_statement) return temp_table_name + + def _close_connection( + self, + connection: Connection, + ) -> None: + """Close the given connection. + + We override this method to ensure that the DuckDB WAL is checkpointed before closing. + + For more info: + - https://duckdb.org/docs/sql/statements/checkpoint.html + """ + connection.execute(text("CHECKPOINT")) + super()._close_connection(connection) diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index ce917358..775a64b3 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -10,6 +10,7 @@ import pyarrow as pa import pyarrow.dataset as ds from pydantic import Field, PrivateAttr +from sqlalchemy import text from airbyte_protocol.models import ConfiguredAirbyteCatalog @@ -110,6 +111,30 @@ def config_hash(self) -> str | None: """ return super(SqlConfig, self).config_hash + def execute_sql(self, sql: str | list[str]) -> None: + """Execute one or more SQL statements against the cache's SQL backend. + + If multiple SQL statements are given, they are executed in order, + within the same transaction. + + This method is useful for creating tables, indexes, and other + schema objects in the cache. It does not return any results and it + automatically closes the connection after executing all statements. + + This method is not intended for querying data. For that, use the `get_records` + method - or for a low-level interface, use the `get_sql_engine` method. + + If any of the statements fail, the transaction is canceled and an exception + is raised. Most databases will rollback the transaction in this case. + """ + if isinstance(sql, str): + # Coerce to a list if a single string is given + sql = [sql] + + with self.processor.get_sql_connection() as connection: + for sql_statement in sql: + connection.execute(text(sql_statement)) + @final @property def processor(self) -> SqlProcessorBase: diff --git a/airbyte/shared/sql_processor.py b/airbyte/shared/sql_processor.py index 8a1afb41..bc8878d7 100644 --- a/airbyte/shared/sql_processor.py +++ b/airbyte/shared/sql_processor.py @@ -368,6 +368,17 @@ def get_sql_engine(self) -> Engine: """Return a new SQL engine to use.""" return self.sql_config.get_sql_engine() + def _close_connection( + self, + connection: Connection, + ) -> None: + """Close the given connection. + + Subclasses can override this method to perform additional cleanup, such + as WAL checkpointing. + """ + connection.close() + @contextmanager def get_sql_connection(self) -> Generator[sqlalchemy.engine.Connection, None, None]: """A context manager which returns a new SQL connection for running queries. @@ -378,7 +389,7 @@ def get_sql_connection(self) -> Generator[sqlalchemy.engine.Connection, None, No self._init_connection_settings(connection) yield connection - connection.close() + self._close_connection(connection) del connection def get_sql_table_name( From 0b39ac0ab1fc1edfcb109a3ae97dbe9a5bb1b62a Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 1 Oct 2024 16:55:45 -0700 Subject: [PATCH 2/5] move _close_connection -> _do_checkpoint --- airbyte/_processors/sql/duckdb.py | 18 ++++++++++++------ airbyte/shared/sql_processor.py | 25 +++++++++++++------------ airbyte/sources/base.py | 4 ++++ 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/airbyte/_processors/sql/duckdb.py b/airbyte/_processors/sql/duckdb.py index 27e5efb4..107e0092 100644 --- a/airbyte/_processors/sql/duckdb.py +++ b/airbyte/_processors/sql/duckdb.py @@ -163,16 +163,22 @@ def _write_files_to_new_table( self._execute_sql(insert_statement) return temp_table_name - def _close_connection( + def _do_checkpoint( self, - connection: Connection, + connection: Connection | None, ) -> None: - """Close the given connection. + """Checkpoint the given connection. - We override this method to ensure that the DuckDB WAL is checkpointed before closing. + We override this method to ensure that the DuckDB WAL is checkpointed explicitly. + Otherwise DuckDB will lazily flush the WAL to disk, which can cause issues for users + who want to manipulate the DB files after writing them. For more info: - https://duckdb.org/docs/sql/statements/checkpoint.html """ - connection.execute(text("CHECKPOINT")) - super()._close_connection(connection) + if connection is not None: + connection.execute(text("CHECKPOINT")) + return + + with self.get_sql_connection() as new_conn: + new_conn.execute(text("CHECKPOINT")) diff --git a/airbyte/shared/sql_processor.py b/airbyte/shared/sql_processor.py index bc8878d7..6e000035 100644 --- a/airbyte/shared/sql_processor.py +++ b/airbyte/shared/sql_processor.py @@ -346,6 +346,19 @@ def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract """ pass + def _do_checkpoint( # noqa: B027 # Intentionally empty, not abstract + self, + connection: Connection | None, + ) -> None: + """Checkpoint the given connection. + + If the WAL log needs to be, it will be flushed. + + For most SQL databases, this is a no-op. However, it exists so that + subclasses can override this method to perform a checkpoint operation. + """ + pass + # Public interface: @property @@ -368,17 +381,6 @@ def get_sql_engine(self) -> Engine: """Return a new SQL engine to use.""" return self.sql_config.get_sql_engine() - def _close_connection( - self, - connection: Connection, - ) -> None: - """Close the given connection. - - Subclasses can override this method to perform additional cleanup, such - as WAL checkpointing. - """ - connection.close() - @contextmanager def get_sql_connection(self) -> Generator[sqlalchemy.engine.Connection, None, None]: """A context manager which returns a new SQL connection for running queries. @@ -389,7 +391,6 @@ def get_sql_connection(self) -> Generator[sqlalchemy.engine.Connection, None, No self._init_connection_settings(connection) yield connection - self._close_connection(connection) del connection def get_sql_table_name( diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 02eec626..aeffc8ed 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -748,6 +748,10 @@ def _read_to_cache( # noqa: PLR0913 # Too many arguments state_writer=state_writer, progress_tracker=progress_tracker, ) + + # Flush the WAL, if applicable + cache.processor._do_checkpoint() # noqa: SLF001 # Non-public API + return ReadResult( source_name=self.name, progress_tracker=progress_tracker, From ac75729eb63ab831bff47b061a485cd06cce4532 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 1 Oct 2024 17:02:15 -0700 Subject: [PATCH 3/5] Update airbyte/_processors/sql/duckdb.py --- airbyte/_processors/sql/duckdb.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/_processors/sql/duckdb.py b/airbyte/_processors/sql/duckdb.py index 107e0092..cdb649e4 100644 --- a/airbyte/_processors/sql/duckdb.py +++ b/airbyte/_processors/sql/duckdb.py @@ -165,7 +165,7 @@ def _write_files_to_new_table( def _do_checkpoint( self, - connection: Connection | None, + connection: Connection | None = None, ) -> None: """Checkpoint the given connection. From 2c55d645022e8743365acaa20dd8043daca3de50 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 1 Oct 2024 17:03:46 -0700 Subject: [PATCH 4/5] Update sql_processor.py --- airbyte/shared/sql_processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte/shared/sql_processor.py b/airbyte/shared/sql_processor.py index 6e000035..2906fa25 100644 --- a/airbyte/shared/sql_processor.py +++ b/airbyte/shared/sql_processor.py @@ -391,6 +391,7 @@ def get_sql_connection(self) -> Generator[sqlalchemy.engine.Connection, None, No self._init_connection_settings(connection) yield connection + connection.close() del connection def get_sql_table_name( From 5ef4986a3ba82267d1d5a9d0987b81df3bc27585 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 2 Oct 2024 08:56:07 -0700 Subject: [PATCH 5/5] default to 'None' for connection to checkpoint --- airbyte/shared/sql_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/shared/sql_processor.py b/airbyte/shared/sql_processor.py index 2906fa25..d4899b7e 100644 --- a/airbyte/shared/sql_processor.py +++ b/airbyte/shared/sql_processor.py @@ -348,7 +348,7 @@ def _setup(self) -> None: # noqa: B027 # Intentionally empty, not abstract def _do_checkpoint( # noqa: B027 # Intentionally empty, not abstract self, - connection: Connection | None, + connection: Connection | None = None, ) -> None: """Checkpoint the given connection.