Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e8664a6
refactor: Added paginated_sql_statement_format as a GenericTransfer p…
davidblain-infrabel Apr 7, 2025
186148c
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
85f6987
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
8369986
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
1a289f1
refactor: Made the paginated_sql_statement_format param public instea…
davidblain-infrabel Apr 7, 2025
35c7ca8
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
3f31ff8
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
a3c5d78
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
1818f78
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
e695d62
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 7, 2025
bd2de31
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 8, 2025
6d1188f
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 8, 2025
c37653b
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 10, 2025
d361cc3
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 14, 2025
7b41fb3
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Apr 29, 2025
923c644
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla May 6, 2025
c9ec86a
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla May 27, 2025
f5aa626
refactor: Renamed paginated_sql_statement_format to paginated_sql_sta…
davidblain-infrabel Jun 4, 2025
0da72fd
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Jun 4, 2025
6ceeed3
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Jun 4, 2025
27bd595
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Jun 6, 2025
0ac0b1e
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Jun 12, 2025
8c04b04
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Jul 3, 2025
36689e6
refactor: Fixed and removed duplicate test_templated_fields
davidblain-infrabel Aug 6, 2025
9aba9d3
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 6, 2025
f2c692c
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 6, 2025
acba149
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 6, 2025
57b46c4
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 6, 2025
9bea3f5
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 12, 2025
2597333
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 12, 2025
3126e0a
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 13, 2025
15e7a0b
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 13, 2025
5afe4d2
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 13, 2025
533ac6b
Merge branch 'main' into feature/add-paginated-sql-param-generictransfer
dabla Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class GenericTransfer(BaseOperator):
executed prior to loading the data. (templated)
:param insert_args: extra params for `insert_rows` method.
:param page_size: number of records to be read in paginated mode (optional).
:param paginated_sql_statement_clause: SQL statement clause to be used for pagination (optional).
"""

template_fields: Sequence[str] = (
Expand All @@ -65,6 +66,8 @@ class GenericTransfer(BaseOperator):
"destination_table",
"preoperator",
"insert_args",
"page_size",
"paginated_sql_statement_clause",
)
template_ext: Sequence[str] = (
".sql",
Expand All @@ -85,6 +88,7 @@ def __init__(
preoperator: str | list[str] | None = None,
insert_args: dict | None = None,
page_size: int | None = None,
paginated_sql_statement_clause: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -97,9 +101,7 @@ def __init__(
self.preoperator = preoperator
self.insert_args = insert_args or {}
self.page_size = page_size
self._paginated_sql_statement_format = kwargs.get(
"paginated_sql_statement_format", "{} LIMIT {} OFFSET {}"
)
self.paginated_sql_statement_clause = paginated_sql_statement_clause or "{} LIMIT {} OFFSET {}"

@classmethod
def get_hook(cls, conn_id: str, hook_params: dict | None = None) -> DbApiHook:
Expand All @@ -126,7 +128,7 @@ def destination_hook(self) -> DbApiHook:

def get_paginated_sql(self, offset: int) -> str:
"""Format the paginated SQL statement using the current format."""
return self._paginated_sql_statement_format.format(self.sql, self.page_size, offset)
return self.paginated_sql_statement_clause.format(self.sql, self.page_size, offset)

def render_template_fields(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ def test_templated_fields(self):
destination_conn_id="{{ destination_conn_id }}",
preoperator="{{ preoperator }}",
insert_args="{{ insert_args }}",
page_size="{{ page_size }}",
paginated_sql_statement_clause="{{ paginated_sql_statement_clause }}",
dag=dag,
)
operator.render_template_fields(
Expand All @@ -251,6 +253,8 @@ def test_templated_fields(self):
"destination_conn_id": "my_destination_conn_id",
"preoperator": "my_preoperator",
"insert_args": {"commit_every": 5000, "executemany": True, "replace": True},
"page_size": 1000,
"paginated_sql_statement_clause": "{} OFFSET {} ROWS FETCH NEXT {} ROWS ONLY;",
}
)
assert operator.sql == "my_sql"
Expand All @@ -259,6 +263,8 @@ def test_templated_fields(self):
assert operator.destination_conn_id == "my_destination_conn_id"
assert operator.preoperator == "my_preoperator"
assert operator.insert_args == {"commit_every": 5000, "executemany": True, "replace": True}
assert operator.page_size == 1000
assert operator.paginated_sql_statement_clause == "{} OFFSET {} ROWS FETCH NEXT {} ROWS ONLY;"

def test_non_paginated_read(self):
with mock.patch(f"{BASEHOOK_PATCH_PATH}.get_connection", side_effect=self.get_connection):
Expand Down
Loading