Skip to content
18 changes: 16 additions & 2 deletions providers/common/sql/src/airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,14 @@ def insert_rows(
)
sql = self._generate_insert_sql(table, values[0], target_fields, replace, **kwargs)
self.log.debug("Generated sql: %s", sql)
cur.executemany(sql, values)

try:
cur.executemany(sql, values)
except Exception as e:
self.log.error("Generated sql: %s", sql)
self.log.error("Parameters: %s", values)
raise e

conn.commit()
nb_rows += len(chunked_rows)
self.log.info("Loaded %s rows into %s so far", nb_rows, table)
Expand All @@ -899,7 +906,14 @@ def insert_rows(
values = self._serialize_cells(row, conn)
sql = self._generate_insert_sql(table, values, target_fields, replace, **kwargs)
self.log.debug("Generated sql: %s", sql)
cur.execute(sql, values)

try:
cur.execute(sql, values)
except Exception as e:
self.log.error("Generated sql: %s", sql)
self.log.error("Parameters: %s", values)
raise e

if commit_every and i % commit_every == 0:
conn.commit()
self.log.info("Loaded %s rows into %s so far", i, table)
Expand Down
23 changes: 23 additions & 0 deletions providers/common/sql/tests/unit/common/sql/hooks/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,29 @@ def test_insert_rows_as_generator_supports_executemany(self, caplog):

self.cur.executemany.assert_any_call(sql, rows)

def test_insert_rows_logs_generated_sql_on_exception(self, caplog):
table = "table"
rows = [("What's",), ("up",), ("world",)]

with caplog.at_level(logging.ERROR):
self.cur.executemany.side_effect = Exception("Boom!")
self.db_hook.supports_executemany = True

with pytest.raises(Exception, match="Boom!"):
self.db_hook.insert_rows(table, iter(rows))

assert self.conn.close.call_count == 1
assert self.cur.close.call_count == 1
assert self.conn.commit.call_count == 1

sql = f"INSERT INTO {table} VALUES (%s)"

assert len(caplog.messages) == 2
assert any(f"Generated sql: {sql}" in message for message in caplog.messages)
assert any(f"Parameters: {rows}" in message for message in caplog.messages)

self.cur.executemany.assert_any_call(sql, rows)

def test_get_uri_schema_not_none(self):
self.db_hook.get_connection = mock.MagicMock(
return_value=Connection(
Expand Down