Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions providers/celery/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,32 @@ config:
sensitive: true
example: '{"password": "password_for_redis_server"}'
default: ~
celery_result_backend_transport_options:
description: |
This section is for specifying options which can be passed to the
underlying celery result backend transport. This is particularly useful when using
Redis Sentinel as the result backend. See:
https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options
options:
master_name:
description: |
The name of the Redis Sentinel primary node to connect to.
Required when using Redis Sentinel as the result backend.
version_added: ~
type: string
example: "mymaster"
default: ~
sentinel_kwargs:
description: |
The sentinel_kwargs parameter allows passing additional options to the Sentinel client
for the result backend. In a typical scenario where Redis Sentinel is used as the
result backend and Redis servers are password-protected, the password needs to be
passed through this parameter. Although its type is string, it is required to pass
a string that conforms to the dictionary format.
See:
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
version_added: ~
type: string
sensitive: true
example: '{"password": "password_for_redis_server"}'
default: ~
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ def _broker_supports_visibility_timeout(url):
log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.")
result_backend = f"db+{conf.get('database', 'SQL_ALCHEMY_CONN')}"

# Handle result backend transport options (for Redis Sentinel support)
result_backend_transport_options: dict = conf.getsection("celery_result_backend_transport_options") or {}
if "sentinel_kwargs" in result_backend_transport_options:
try:
result_sentinel_kwargs = json.loads(result_backend_transport_options["sentinel_kwargs"])
if not isinstance(result_sentinel_kwargs, dict):
raise ValueError
result_backend_transport_options["sentinel_kwargs"] = result_sentinel_kwargs
except Exception:
raise AirflowException(
"sentinel_kwargs in [celery_result_backend_transport_options] should be written "
"in the correct dictionary format."
)

extra_celery_config = conf.getjson("celery", "extra_celery_config", fallback={})

DEFAULT_CELERY_CONFIG = {
Expand All @@ -86,6 +100,7 @@ def _broker_supports_visibility_timeout(url):
"celery", "broker_connection_retry_on_startup", fallback=True
),
"result_backend": result_backend,
"result_backend_transport_options": result_backend_transport_options,
"database_engine_options": conf.getjson(
"celery", "result_backend_sqlalchemy_engine_options", fallback={}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,25 @@ def get_provider_info():
},
},
},
"celery_result_backend_transport_options": {
"description": "This section is for specifying options which can be passed to the\nunderlying celery result backend transport. This is particularly useful when using\nRedis Sentinel as the result backend. See:\nhttps://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-result_backend_transport_options\n",
"options": {
"master_name": {
"description": "The name of the Redis Sentinel primary node to connect to.\nRequired when using Redis Sentinel as the result backend.\n",
"version_added": None,
"type": "string",
"example": "mymaster",
"default": None,
},
"sentinel_kwargs": {
"description": "The sentinel_kwargs parameter allows passing additional options to the Sentinel client\nfor the result backend. In a typical scenario where Redis Sentinel is used as the\nresult backend and Redis servers are password-protected, the password needs to be\npassed through this parameter. Although its type is string, it is required to pass\na string that conforms to the dictionary format.\nSee:\nhttps://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration\n",
"version_added": None,
"type": "string",
"sensitive": True,
"example": '{"password": "password_for_redis_server"}',
"default": None,
},
},
},
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,94 @@ def test_celery_extra_celery_config_loaded_from_string():
# reload celery conf to apply the new config
importlib.reload(default_celery)
assert default_celery.DEFAULT_CELERY_CONFIG["worker_max_tasks_per_child"] == 10


@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}'})
def test_result_backend_sentinel_kwargs_loaded_from_string():
"""Test that sentinel_kwargs for result backend transport options is correctly parsed."""
import importlib

# reload celery conf to apply the new config
importlib.reload(default_celery)
assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG
assert default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["sentinel_kwargs"] == {
"password": "redis_password"
}


@conf_vars({("celery_result_backend_transport_options", "master_name"): "mymaster"})
def test_result_backend_master_name_loaded():
"""Test that master_name for result backend transport options is correctly loaded."""
import importlib

# reload celery conf to apply the new config
importlib.reload(default_celery)
assert "result_backend_transport_options" in default_celery.DEFAULT_CELERY_CONFIG
assert (
default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]["master_name"] == "mymaster"
)


@conf_vars(
{
("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_password"}',
("celery_result_backend_transport_options", "master_name"): "mymaster",
}
)
def test_result_backend_transport_options_with_multiple_options():
"""Test that multiple result backend transport options are correctly loaded."""
import importlib

# reload celery conf to apply the new config
importlib.reload(default_celery)
result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]
assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_password"}
assert result_backend_opts["master_name"] == "mymaster"


@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): "invalid_json"})
def test_result_backend_sentinel_kwargs_invalid_json():
"""Test that invalid JSON in sentinel_kwargs raises an error."""
import importlib

from airflow.providers.common.compat.sdk import AirflowException

with pytest.raises(
AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"
):
importlib.reload(default_celery)


@conf_vars({("celery_result_backend_transport_options", "sentinel_kwargs"): '"not_a_dict"'})
def test_result_backend_sentinel_kwargs_not_dict():
"""Test that non-dict sentinel_kwargs raises an error."""
import importlib

from airflow.providers.common.compat.sdk import AirflowException

with pytest.raises(
AirflowException, match="sentinel_kwargs.*should be written in the correct dictionary format"
):
importlib.reload(default_celery)


@conf_vars(
{
("celery", "result_backend"): "sentinel://sentinel1:26379;sentinel://sentinel2:26379",
("celery_result_backend_transport_options", "sentinel_kwargs"): '{"password": "redis_pass"}',
("celery_result_backend_transport_options", "master_name"): "mymaster",
}
)
def test_result_backend_sentinel_full_config():
"""Test full Redis Sentinel configuration for result backend."""
import importlib

# reload celery conf to apply the new config
importlib.reload(default_celery)

assert default_celery.DEFAULT_CELERY_CONFIG["result_backend"] == (
"sentinel://sentinel1:26379;sentinel://sentinel2:26379"
)
result_backend_opts = default_celery.DEFAULT_CELERY_CONFIG["result_backend_transport_options"]
assert result_backend_opts["sentinel_kwargs"] == {"password": "redis_pass"}
assert result_backend_opts["master_name"] == "mymaster"