From 77425222d4000ce03dd4be0dfbb4358a8b7c8b03 Mon Sep 17 00:00:00 2001 From: Harshith Gandhe Date: Tue, 16 Dec 2025 14:47:04 +0530 Subject: [PATCH 1/4] Add result_backend_transport_options for Redis Sentinel support in Celery --- providers/celery/provider.yaml | 29 +++++++ .../celery/executors/default_celery.py | 15 ++++ .../celery/executors/test_celery_executor.py | 87 +++++++++++++++++++ 3 files changed, 131 insertions(+) diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml index cd4b72705a0bb..869485c9d2f1f 100644 --- a/providers/celery/provider.yaml +++ b/providers/celery/provider.yaml @@ -380,3 +380,32 @@ config: sensitive: true example: '{"password": "password_for_redis_server"}' default: ~ + celery_result_backend_transport_options: + description: | + This section is for specifying options which can be passed to the + underlying celery result backend transport. This is particularly useful when using + Redis Sentinel as the result backend. See: + https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options + options: + master_name: + description: | + The name of the Redis Sentinel master to connect to. + Required when using Redis Sentinel as the result backend. + version_added: ~ + type: string + example: "mymaster" + default: ~ + sentinel_kwargs: + description: | + The sentinel_kwargs parameter allows passing additional options to the Sentinel client + for the result backend. In a typical scenario where Redis Sentinel is used as the + result backend and Redis servers are password-protected, the password needs to be + passed through this parameter. Although its type is string, it is required to pass + a string that conforms to the dictionary format. + See: + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration + version_added: ~ + type: string + sensitive: true + example: '{"password": "password_for_redis_server"}' + default: ~ diff --git a/providers/celery/src/airflow/providers/celery/executors/default_celery.py b/providers/celery/src/airflow/providers/celery/executors/default_celery.py index 4d49be0cc9584..284a492ca2bcc 100644 --- a/providers/celery/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/celery/src/airflow/providers/celery/executors/default_celery.py @@ -70,6 +70,20 @@ def _broker_supports_visibility_timeout(url): log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.") result_backend = f"db+{conf.get('database', 'SQL_ALCHEMY_CONN')}" +# Handle result backend transport options (for Redis Sentinel support) +result_backend_transport_options: dict = conf.getsection("celery_result_backend_transport_options") or {} +if "sentinel_kwargs" in result_backend_transport_options: + try: + result_sentinel_kwargs = json.loads(result_backend_transport_options["sentinel_kwargs"]) + if not isinstance(result_sentinel_kwargs, dict): + raise ValueError + result_backend_transport_options["sentinel_kwargs"] = result_sentinel_kwargs + except Exception: + raise AirflowException( + "sentinel_kwargs in [celery_result_backend_transport_options] should be written " + "in the correct dictionary format." + ) + extra_celery_config = conf.getjson("celery", "extra_celery_config", fallback={}) DEFAULT_CELERY_CONFIG = { @@ -86,6 +100,7 @@ def _broker_supports_visibility_timeout(url): "celery", "broker_connection_retry_on_startup", fallback=True ), "result_backend": result_backend, + "result_backend_transport_options": result_backend_transport_options, "database_engine_options": conf.getjson( "celery", "result_backend_sqlalchemy_engine_options", fallback={} ), diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index 00b40d59a2a10..6005a63274825 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -466,3 +466,90 @@ def test_celery_extra_celery_config_loaded_from_string(): # reload celery conf to apply the new config importlib.reload(default_celery) assert default_celery.DEFAULT_CELERY_CONFIG["worker_max_tasks_per_child"] == 10 + + +@conf_vars( + {("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}'} +) +def test_result_backend_sentinel_kwargs_loaded_from_string(): + """Test that sentinel_kwargs for result backend transport options is correctly parsed.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG + assert default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["sentinel_kwargs"] == { + "password": "redis_password" + } + + +@conf_vars({("celery_result_backend_transport_options", "master_name"): "mymaster"}) +def test_result_backend_master_name_loaded(): + """Test that master_name for result backend transport options is correctly loaded.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG + assert default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["master_name"] == "mymaster" + + +@conf_vars( + { + ("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}', + ("celery_result_backend_transport_options", "master_name"): "mymaster", + } +) +def test_result_backend_transport_options_with_multiple_options(): + """Test that multiple result backend transport options are correctly loaded.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"] + assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_password"} + assert result_backend_opts["master_name"] == "mymaster" + + +@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): "invalid_json"}) +def test_result_backend_sentinel_kwargs_invalid_json(): + """Test that invalid JSON in sentinel_kwargs raises an error.""" + import importlib + + from airflow.providers.common.compat.sdk import AirflowException + + with pytest.raises(AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"): + importlib.reload(default_celery) + + +@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '"not_a_dict"'}) +def test_result_backend_sentinel_kwargs_not_dict(): + """Test that non-dict sentinel_kwargs raises an error.""" + import importlib + + from airflow.providers.common.compat.sdk import AirflowException + + with pytest.raises(AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"): + importlib.reload(default_celery) + + +@conf_vars( + { + ("celery", "result_backend"): "sentinel://sentinel1:26379;sentinel://sentinel2:26379", + ("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_pass"}', + ("celery_result_backend_transport_options", "master_name"): "mymaster", + } +) +def test_result_backend_sentinel_full_config(): + """Test full Redis Sentinel configuration for result backend.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + + assert default_celery.DEFAULT_CELERY_CONFIG["result_backend"] == ( + "sentinel://sentinel1:26379;sentinel://sentinel2:26379" + ) + result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"] + assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_pass"} + assert result_backend_opts["master_name"] == "mymaster" From befa20d7c0d1deffefdd679ad5ff3becfa5a7f4c Mon Sep 17 00:00:00 2001 From: Harshith Gandhe Date: Wed, 17 Dec 2025 10:44:46 +0530 Subject: [PATCH 2/4] Fix language check --- providers/celery/provider.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml index 869485c9d2f1f..0e665a91ffcc6 100644 --- a/providers/celery/provider.yaml +++ b/providers/celery/provider.yaml @@ -389,7 +389,7 @@ config: options: master_name: description: | - The name of the Redis Sentinel master to connect to. + The name of the Redis Sentinel primary node to connect to. Required when using Redis Sentinel as the result backend. version_added: ~ type: string From cf1b77ed323df3ed36e0387c96b11a3c567c5fa7 Mon Sep 17 00:00:00 2001 From: Harshith Gandhe Date: Sat, 20 Dec 2025 00:34:37 +0530 Subject: [PATCH 3/4] Fix static checks --- .../celery/executors/test_celery_executor.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index 6005a63274825..6a7f0d8a84c77 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -468,9 +468,7 @@ def test_celery_extra_celery_config_loaded_from_string(): assert default_celery.DEFAULT_CELERY_CONFIG["worker_max_tasks_per_child"] == 10 -@conf_vars( - {("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}'} -) +@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}'}) def test_result_backend_sentinel_kwargs_loaded_from_string(): """Test that sentinel_kwargs for result backend transport options is correctly parsed.""" import importlib @@ -491,7 +489,9 @@ def test_result_backend_master_name_loaded(): # reload celery conf to apply the new config importlib.reload(default_celery) assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG - assert default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["master_name"] == "mymaster" + assert ( + default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["master_name"] == "mymaster" + ) @conf_vars( @@ -518,7 +518,9 @@ def test_result_backend_sentinel_kwargs_invalid_json(): from airflow.providers.common.compat.sdk import AirflowException - with pytest.raises(AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"): + with pytest.raises( + AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format" + ): importlib.reload(default_celery) @@ -529,7 +531,9 @@ def test_result_backend_sentinel_kwargs_not_dict(): from airflow.providers.common.compat.sdk import AirflowException - with pytest.raises(AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"): + with pytest.raises( + AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format" + ): importlib.reload(default_celery) From fce434627adaa27a070c97c32ab033de08c53f2c Mon Sep 17 00:00:00 2001 From: Harshith Gandhe Date: Tue, 23 Dec 2025 23:29:12 +0530 Subject: [PATCH 4/4] Added info --- .../providers/celery/get_provider_info.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py b/providers/celery/src/airflow/providers/celery/get_provider_info.py index c5000fbbdfb5a..9c4b5cda2191b 100644 --- a/providers/celery/src/airflow/providers/celery/get_provider_info.py +++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py @@ -260,5 +260,25 @@ def get_provider_info(): }, }, }, + "celery_result_backend_transport_options": { + "description": "This section is for specifying options which can be passed to the\nunderlying celery result backend transport. This is particularly useful when using\nRedis Sentinel as the result backend. See:\nhttps://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options\n", + "options": { + "master_name": { + "description": "The name of the Redis Sentinel primary node to connect to.\nRequired when using Redis Sentinel as the result backend.\n", + "version_added": None, + "type": "string", + "example": "mymaster", + "default": None, + }, + "sentinel_kwargs": { + "description": "The sentinel_kwargs parameter allows passing additional options to the Sentinel client\nfor the result backend. In a typical scenario where Redis Sentinel is used as the\nresult backend and Redis servers are password-protected, the password needs to be\npassed through this parameter. Although its type is string, it is required to pass\na string that conforms to the dictionary format.\nSee:\nhttps://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration\n", + "version_added": None, + "type": "string", + "sensitive": True, + "example": '{"password": "password_for_redis_server"}', + "default": None, + }, + }, + }, }, }