Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import psycopg2
import psycopg2.extensions
import psycopg2.extras
from psycopg2.extras import DictCursor, NamedTupleCursor, RealDictCursor
from psycopg2.extras import DictCursor, Json, NamedTupleCursor, RealDictCursor
from sqlalchemy.engine import URL

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -217,16 +217,20 @@ def _serialize_cell(cell: object, conn: connection | None = None) -> Any:
"""
Serialize a cell.

PostgreSQL adapts all arguments to the ``execute()`` method internally,
hence we return the cell without any conversion.
In order to pass a Python object to the database as query argument you can use the
Json (class psycopg2.extras.Json) adapter.

See http://initd.org/psycopg/docs/advanced.html#adapting-new-types for
Reading from the database, json and jsonb values will be automatically converted to Python objects.

See https://www.psycopg.org/docs/extras.html#json-adaptation for
more information.

:param cell: The cell to insert into the table
:param conn: The database connection
:return: The cell
"""
if isinstance(cell, (dict, list)):
cell = Json(cell)
return cell

def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
Expand Down
19 changes: 19 additions & 0 deletions providers/postgres/tests/unit/postgres/hooks/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import psycopg2.extras
import pytest
import sqlalchemy
from psycopg2.extras import Json

from airflow.exceptions import AirflowException
from airflow.models import Connection
Expand Down Expand Up @@ -498,6 +499,24 @@ def test_bulk_dump(self, tmp_path):

assert sorted(input_data) == sorted(results)

@pytest.mark.parametrize(
"raw_cell, expected_serialized",
[
("cell content", "cell content"),
(342, 342),
(
{"key1": "value2", "n_key": {"sub_key": "sub_value"}},
{"key1": "value2", "n_key": {"sub_key": "sub_value"}},
),
([1, 2, {"key1": "value2"}, "some data"], [1, 2, {"key1": "value2"}, "some data"]),
],
)
def test_serialize_cell(self, raw_cell, expected_serialized):
if isinstance(raw_cell, Json):
assert expected_serialized == raw_cell.adapted
else:
assert expected_serialized == raw_cell

def test_insert_rows(self):
table = "table"
rows = [("hello",), ("world",)]
Expand Down