Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5b00a3f
refactor: Allow a list of SQL to be passed to the GenericTransfer jus…
dabla Sep 24, 2025
7b8415d
refactor: Reformatted execute method in GenericTransfer
dabla Sep 24, 2025
63efd0c
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Sep 24, 2025
d6ed8c8
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Sep 24, 2025
7074606
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Sep 25, 2025
f2585f5
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Sep 28, 2025
89599fe
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 4, 2025
9cedce3
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 5, 2025
ad52a83
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 5, 2025
465ec67
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 7, 2025
36a7de1
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 7, 2025
c1052e6
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 8, 2025
5a7bd64
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 8, 2025
ab0dd84
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 22, 2025
faf6487
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 23, 2025
4dfd072
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 28, 2025
62a983f
Update providers/common/sql/src/airflow/providers/common/sql/operator…
dabla Oct 28, 2025
e07d2ee
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 28, 2025
a0e7a2d
refactor: Reverted list to str for sql parameter in test GenericTransfer
dabla Oct 28, 2025
881f447
refactor: Move check of sql parameter in constructor of GenericTransfer
dabla Oct 28, 2025
8224363
refactor: Added unit test for GenericTransfer when multiple SQL state…
dabla Oct 28, 2025
3cb7f55
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 28, 2025
f05f994
Revert "refactor: Move check of sql parameter in constructor of Gener…
dabla Oct 28, 2025
07790ee
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 28, 2025
c1f39d4
Merge branch 'main' into feature/allow-multiple-sql-statements-in-gen…
dabla Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class GenericTransfer(BaseOperator):
def __init__(
self,
*,
sql: str,
sql: str | list[str],
destination_table: str,
source_conn_id: str,
source_hook_params: dict | None = None,
Expand Down Expand Up @@ -156,13 +156,19 @@ def execute(self, context: Context):
method_name=self.execute_complete.__name__,
)
else:
if isinstance(self.sql, str):
self.sql = [self.sql]

self.log.info("Extracting data from %s", self.source_conn_id)
self.log.info("Executing: \n %s", self.sql)
for sql in self.sql:
self.log.info("Executing: \n %s", sql)

results = self.source_hook.get_records(self.sql)
results = self.source_hook.get_records(sql)

self.log.info("Inserting rows into %s", self.destination_conn_id)
self.destination_hook.insert_rows(table=self.destination_table, rows=results, **self.insert_args)
self.log.info("Inserting rows into %s", self.destination_conn_id)
self.destination_hook.insert_rows(
table=self.destination_table, rows=results, **self.insert_args
)

def execute_complete(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class GenericTransfer(BaseOperator):
def __init__(
self,
*,
sql: str,
sql: str | list[str],
destination_table: str,
source_conn_id: str,
source_hook_params: dict | None = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,38 @@ def test_non_paginated_read(self):
**{"rows": [[1, 2], [11, 12], [3, 4], [13, 14], [3, 4], [13, 14]], "table": "NEW_HR.EMPLOYEES"},
}

def test_non_paginated_read_for_multiple_sql_statements(self):
with mock.patch(f"{BASEHOOK_PATCH_PATH}.get_connection", side_effect=self.get_connection):
with mock.patch(f"{BASEHOOK_PATCH_PATH}.get_hook", side_effect=self.get_hook):
operator = GenericTransfer(
task_id="transfer_table",
source_conn_id="my_source_conn_id",
destination_conn_id="my_destination_conn_id",
sql=["SELECT * FROM HR.EMPLOYEES", "SELECT * FROM HR.PEOPLE"],
destination_table="NEW_HR.EMPLOYEES",
insert_args=INSERT_ARGS,
execution_timeout=timedelta(hours=1),
)

operator.execute(context=mock_context(task=operator))

assert self.mocked_source_hook.get_records.call_count == 2
assert [call.args[0] for call in self.mocked_source_hook.get_records.call_args_list] == [
"SELECT * FROM HR.EMPLOYEES",
"SELECT * FROM HR.PEOPLE",
]
assert self.mocked_destination_hook.insert_rows.call_count == 2
assert self.mocked_destination_hook.insert_rows.call_args_list[0].kwargs == {
**INSERT_ARGS,
"rows": [[1, 2], [11, 12], [3, 4], [13, 14], [3, 4], [13, 14]],
"table": "NEW_HR.EMPLOYEES",
}
assert self.mocked_destination_hook.insert_rows.call_args_list[1].kwargs == {
**INSERT_ARGS,
"rows": [[1, 2], [11, 12], [3, 4], [13, 14], [3, 4], [13, 14]],
"table": "NEW_HR.EMPLOYEES",
}

def test_paginated_read(self):
"""
This unit test is based on the example described in the medium article:
Expand Down