diff --git a/providers/celery/provider.yaml b/providers/celery/provider.yaml index cd4b72705a0bb..0e665a91ffcc6 100644 --- a/providers/celery/provider.yaml +++ b/providers/celery/provider.yaml @@ -380,3 +380,32 @@ config: sensitive: true example: '{"password": "password_for_redis_server"}' default: ~ + celery_result_backend_transport_options: + description: | + This section is for specifying options which can be passed to the + underlying celery result backend transport. This is particularly useful when using + Redis Sentinel as the result backend. See: + https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options + options: + master_name: + description: | + The name of the Redis Sentinel primary node to connect to. + Required when using Redis Sentinel as the result backend. + version_added: ~ + type: string + example: "mymaster" + default: ~ + sentinel_kwargs: + description: | + The sentinel_kwargs parameter allows passing additional options to the Sentinel client + for the result backend. In a typical scenario where Redis Sentinel is used as the + result backend and Redis servers are password-protected, the password needs to be + passed through this parameter. Although its type is string, it is required to pass + a string that conforms to the dictionary format. + See: + https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration + version_added: ~ + type: string + sensitive: true + example: '{"password": "password_for_redis_server"}' + default: ~ diff --git a/providers/celery/src/airflow/providers/celery/executors/default_celery.py b/providers/celery/src/airflow/providers/celery/executors/default_celery.py index 4d49be0cc9584..284a492ca2bcc 100644 --- a/providers/celery/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/celery/src/airflow/providers/celery/executors/default_celery.py @@ -70,6 +70,20 @@ def _broker_supports_visibility_timeout(url): log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.") result_backend = f"db+{conf.get('database', 'SQL_ALCHEMY_CONN')}" +# Handle result backend transport options (for Redis Sentinel support) +result_backend_transport_options: dict = conf.getsection("celery_result_backend_transport_options") or {} +if "sentinel_kwargs" in result_backend_transport_options: + try: + result_sentinel_kwargs = json.loads(result_backend_transport_options["sentinel_kwargs"]) + if not isinstance(result_sentinel_kwargs, dict): + raise ValueError + result_backend_transport_options["sentinel_kwargs"] = result_sentinel_kwargs + except Exception: + raise AirflowException( + "sentinel_kwargs in [celery_result_backend_transport_options] should be written " + "in the correct dictionary format." + ) + extra_celery_config = conf.getjson("celery", "extra_celery_config", fallback={}) DEFAULT_CELERY_CONFIG = { @@ -86,6 +100,7 @@ def _broker_supports_visibility_timeout(url): "celery", "broker_connection_retry_on_startup", fallback=True ), "result_backend": result_backend, + "result_backend_transport_options": result_backend_transport_options, "database_engine_options": conf.getjson( "celery", "result_backend_sqlalchemy_engine_options", fallback={} ), diff --git a/providers/celery/src/airflow/providers/celery/get_provider_info.py b/providers/celery/src/airflow/providers/celery/get_provider_info.py index c5000fbbdfb5a..9c4b5cda2191b 100644 --- a/providers/celery/src/airflow/providers/celery/get_provider_info.py +++ b/providers/celery/src/airflow/providers/celery/get_provider_info.py @@ -260,5 +260,25 @@ def get_provider_info(): }, }, }, + "celery_result_backend_transport_options": { + "description": "This section is for specifying options which can be passed to the\nunderlying celery result backend transport. This is particularly useful when using\nRedis Sentinel as the result backend. See:\nhttps://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options\n", + "options": { + "master_name": { + "description": "The name of the Redis Sentinel primary node to connect to.\nRequired when using Redis Sentinel as the result backend.\n", + "version_added": None, + "type": "string", + "example": "mymaster", + "default": None, + }, + "sentinel_kwargs": { + "description": "The sentinel_kwargs parameter allows passing additional options to the Sentinel client\nfor the result backend. In a typical scenario where Redis Sentinel is used as the\nresult backend and Redis servers are password-protected, the password needs to be\npassed through this parameter. Although its type is string, it is required to pass\na string that conforms to the dictionary format.\nSee:\nhttps://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration\n", + "version_added": None, + "type": "string", + "sensitive": True, + "example": '{"password": "password_for_redis_server"}', + "default": None, + }, + }, + }, }, } diff --git a/providers/celery/tests/unit/celery/executors/test_celery_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_executor.py index 00b40d59a2a10..6a7f0d8a84c77 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py @@ -466,3 +466,94 @@ def test_celery_extra_celery_config_loaded_from_string(): # reload celery conf to apply the new config importlib.reload(default_celery) assert default_celery.DEFAULT_CELERY_CONFIG["worker_max_tasks_per_child"] == 10 + + +@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}'}) +def test_result_backend_sentinel_kwargs_loaded_from_string(): + """Test that sentinel_kwargs for result backend transport options is correctly parsed.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG + assert default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["sentinel_kwargs"] == { + "password": "redis_password" + } + + +@conf_vars({("celery_result_backend_transport_options", "master_name"): "mymaster"}) +def test_result_backend_master_name_loaded(): + """Test that master_name for result backend transport options is correctly loaded.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG + assert ( + default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["master_name"] == "mymaster" + ) + + +@conf_vars( + { + ("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}', + ("celery_result_backend_transport_options", "master_name"): "mymaster", + } +) +def test_result_backend_transport_options_with_multiple_options(): + """Test that multiple result backend transport options are correctly loaded.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"] + assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_password"} + assert result_backend_opts["master_name"] == "mymaster" + + +@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): "invalid_json"}) +def test_result_backend_sentinel_kwargs_invalid_json(): + """Test that invalid JSON in sentinel_kwargs raises an error.""" + import importlib + + from airflow.providers.common.compat.sdk import AirflowException + + with pytest.raises( + AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format" + ): + importlib.reload(default_celery) + + +@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '"not_a_dict"'}) +def test_result_backend_sentinel_kwargs_not_dict(): + """Test that non-dict sentinel_kwargs raises an error.""" + import importlib + + from airflow.providers.common.compat.sdk import AirflowException + + with pytest.raises( + AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format" + ): + importlib.reload(default_celery) + + +@conf_vars( + { + ("celery", "result_backend"): "sentinel://sentinel1:26379;sentinel://sentinel2:26379", + ("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_pass"}', + ("celery_result_backend_transport_options", "master_name"): "mymaster", + } +) +def test_result_backend_sentinel_full_config(): + """Test full Redis Sentinel configuration for result backend.""" + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + + assert default_celery.DEFAULT_CELERY_CONFIG["result_backend"] == ( + "sentinel://sentinel1:26379;sentinel://sentinel2:26379" + ) + result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"] + assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_pass"} + assert result_backend_opts["master_name"] == "mymaster"