diff --git a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py index 5eb1e1420b51a..8f0c46d52a228 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py @@ -890,7 +890,14 @@ def insert_rows( ) sql = self._generate_insert_sql(table, values[0], target_fields, replace, **kwargs) self.log.debug("Generated sql: %s", sql) - cur.executemany(sql, values) + + try: + cur.executemany(sql, values) + except Exception as e: + self.log.error("Generated sql: %s", sql) + self.log.error("Parameters: %s", values) + raise e + conn.commit() nb_rows += len(chunked_rows) self.log.info("Loaded %s rows into %s so far", nb_rows, table) @@ -899,7 +906,14 @@ def insert_rows( values = self._serialize_cells(row, conn) sql = self._generate_insert_sql(table, values, target_fields, replace, **kwargs) self.log.debug("Generated sql: %s", sql) - cur.execute(sql, values) + + try: + cur.execute(sql, values) + except Exception as e: + self.log.error("Generated sql: %s", sql) + self.log.error("Parameters: %s", values) + raise e + if commit_every and i % commit_every == 0: conn.commit() self.log.info("Loaded %s rows into %s so far", i, table) diff --git a/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py b/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py index e54fe98e7df55..6cccd66b35d15 100644 --- a/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py +++ b/providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py @@ -267,6 +267,29 @@ def test_insert_rows_as_generator_supports_executemany(self, caplog): self.cur.executemany.assert_any_call(sql, rows) + def test_insert_rows_logs_generated_sql_on_exception(self, caplog): + table = "table" + rows = [("What's",), ("up",), ("world",)] + + with caplog.at_level(logging.ERROR): + self.cur.executemany.side_effect = Exception("Boom!") + self.db_hook.supports_executemany = True + + with pytest.raises(Exception, match="Boom!"): + self.db_hook.insert_rows(table, iter(rows)) + + assert self.conn.close.call_count == 1 + assert self.cur.close.call_count == 1 + assert self.conn.commit.call_count == 1 + + sql = f"INSERT INTO {table} VALUES (%s)" + + assert len(caplog.messages) == 2 + assert any(f"Generated sql: {sql}" in message for message in caplog.messages) + assert any(f"Parameters: {rows}" in message for message in caplog.messages) + + self.cur.executemany.assert_any_call(sql, rows) + def test_get_uri_schema_not_none(self): self.db_hook.get_connection = mock.MagicMock( return_value=Connection(