diff --git a/airflow-core/src/airflow/models/connection.py b/airflow-core/src/airflow/models/connection.py index 1567cab1dd2bc..167df13c80c9e 100644 --- a/airflow-core/src/airflow/models/connection.py +++ b/airflow-core/src/airflow/models/connection.py @@ -241,6 +241,11 @@ def _parse_from_uri(self, uri: str): if self.EXTRA_KEY in query: self.extra = query[self.EXTRA_KEY] else: + for key, value in query.items(): + try: + query[key] = json.loads(value) + except (JSONDecodeError, TypeError): + self.log.info("Failed parsing the json for key %s", key) self.extra = json.dumps(query) @staticmethod @@ -316,15 +321,22 @@ def get_uri(self) -> str: uri += host_block if self.extra: + extra_dict = self.extra_dejson + can_flatten = True + for value in extra_dict.values(): + if not isinstance(value, str): + can_flatten = False + break + try: - query: str | None = urlencode(self.extra_dejson) + query: str | None = urlencode(extra_dict) except TypeError: query = None - if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)): + + if can_flatten and query and extra_dict == dict(parse_qsl(query, keep_blank_values=True)): uri += ("?" if self.schema else "/?") + query else: uri += ("?" if self.schema else "/?") + urlencode({self.EXTRA_KEY: self.extra}) - return uri def get_password(self) -> str | None: diff --git a/airflow-core/tests/unit/models/test_connection.py b/airflow-core/tests/unit/models/test_connection.py index 6fdea1b2d0553..ba875ad38d4b4 100644 --- a/airflow-core/tests/unit/models/test_connection.py +++ b/airflow-core/tests/unit/models/test_connection.py @@ -129,6 +129,28 @@ class TestConnection: None, r"Invalid connection string: type://user:pass@protocol://host:port?param=value.", ), + ( + "type://host?int_param=123&bool_param=true&float_param=1.5&str_param=some_str", + "type", + "host", + None, + None, + None, + "", + {"int_param": 123, "bool_param": True, "float_param": 1.5, "str_param": "some_str"}, + None, + ), + ( + "type://host?__extra__=%7B%22foo%22%3A+%22bar%22%7D", + "type", + "host", + None, + None, + None, + "", + {"foo": "bar"}, + None, + ), ], ) def test_parse_from_uri( @@ -193,6 +215,14 @@ def test_parse_from_uri( ), "type://protocol://user:pass@host:100/schema?param1=val1¶m2=val2", ), + ( + Connection( + conn_type="type", + host="host", + extra={"bool_param": True, "int_param": 123, "float_param": 1.5, "list_param": [1, 2]}, + ), + "type://host/?__extra__=%7B%22bool_param%22%3A+true%2C+%22int_param%22%3A+123%2C+%22float_param%22%3A+1.5%2C+%22list_param%22%3A+%5B1%2C+2%5D%7D", + ), ], ) def test_get_uri(self, connection, expected_uri):