diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py index 77ab8568cbc53..0208ea1a0a5f7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py @@ -64,11 +64,13 @@ def handle_bulk_create(self, action: BulkCreateAction, results: BulkActionRespon for variable in action.entities: if variable.key in create_keys: + should_serialize_json = isinstance(variable.value, (dict, list)) Variable.set( key=variable.key, value=variable.value, description=variable.description, session=self.session, + serialize_json=should_serialize_json, ) results.success.append(variable.key) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py index c33afefa30e97..57b8d9bd2a036 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_variables.py @@ -1060,6 +1060,55 @@ def test_bulk_variables(self, test_client, actions, expected_results, session): assert response_data[key] == value check_last_log(session, dag_id=None, event="bulk_variables", logical_date=None) + @pytest.mark.parametrize( + "entity_key, entity_value, entity_description", + [ + ( + "my_dict_var_param", + {"name": "Test Dict Param", "id": 123, "active": True}, + "A dict value (param)", + ), + ("my_list_var_param", ["alpha", 42, False, {"nested": "item param"}], "A list value (param)"), + ("my_string_var_param", "plain string param", "A plain string (param)"), + ], + ids=[ + "dict_variable", + "list_variable", + "string_variable", + ], + ) + def test_bulk_create_entity_serialization( + self, test_client, session, entity_key, entity_value, entity_description + ): + actions = { + "actions": [ + { + "action": "create", + "entities": [ + {"key": entity_key, "value": entity_value, "description": entity_description}, + ], + "action_on_existence": "fail", + } + ] + } + + response = test_client.patch("/variables", json=actions) + assert response.status_code == 200 + + if isinstance(entity_value, (dict, list)): + retrieved_value_deserialized = Variable.get(entity_key, deserialize_json=True) + assert retrieved_value_deserialized == entity_value + retrieved_value_raw_string = Variable.get(entity_key, deserialize_json=False) + assert retrieved_value_raw_string == json.dumps(entity_value, indent=2) + else: + retrieved_value_raw = Variable.get(entity_key, deserialize_json=False) + assert retrieved_value_raw == str(entity_value) + + with pytest.raises(json.JSONDecodeError): + Variable.get(entity_key, deserialize_json=True) + + check_last_log(session, dag_id=None, event="bulk_variables", logical_date=None) + def test_bulk_variables_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.patch("/variables", json={}) assert response.status_code == 401