Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose celery #245

Merged
merged 23 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to Merlin will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- The ability to override any value of the celery configuration thru `app.yaml` in `celery.override`.
- 2 unit tests for the format of broker url and result backend.
ben-bay marked this conversation as resolved.
Show resolved Hide resolved

## [1.6.2]

### Added
Expand Down
64 changes: 23 additions & 41 deletions merlin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
from celery.signals import worker_process_init

import merlin.common.security.encrypt_backend_traffic
from merlin.config import broker, results_backend
from merlin.config import broker, celeryconfig, results_backend
from merlin.config.configfile import CONFIG
from merlin.log_formatter import FORMATS
from merlin.router import route_for_task
from merlin.utils import nested_namespace_to_dicts


LOG = logging.getLogger(__name__)
Expand All @@ -54,65 +54,47 @@
results_ssl = False
try:
BROKER_URI = broker.get_connection_string()
LOG.info(f"broker: {broker.get_connection_string(include_password=False)}")
LOG.debug(f"broker: {broker.get_connection_string(include_password=False)}")
broker_ssl = broker.get_ssl_config()
LOG.info(f"broker_ssl = {broker_ssl}")
LOG.debug(f"broker_ssl = {broker_ssl}")
RESULTS_BACKEND_URI = results_backend.get_connection_string()
results_ssl = results_backend.get_ssl_config(celery_check=True)
LOG.info(
LOG.debug(
f"results: {results_backend.get_connection_string(include_password=False)}"
)
LOG.info(f"results: redis_backed_use_ssl = {results_ssl}")
LOG.debug(f"results: redis_backed_use_ssl = {results_ssl}")
except ValueError:
# These variables won't be set if running with '--local'.
BROKER_URI = None
RESULTS_BACKEND_URI = None


# initialize app with essential properties
app = Celery(
"merlin",
broker=BROKER_URI,
backend=RESULTS_BACKEND_URI,
broker_use_ssl=broker_ssl,
redis_backend_use_ssl=results_ssl,
task_routes=(route_for_task,),
)


app.conf.update(
task_serializer="pickle", accept_content=["pickle"], result_serializer="pickle"
)

# load merlin config defaults
app.conf.update(**celeryconfig.DICT)

# load config overrides from app.yaml
if (
ben-bay marked this conversation as resolved.
Show resolved Hide resolved
(not hasattr(CONFIG.celery, "override"))
or (CONFIG.celery.override is None)
or (len(nested_namespace_to_dicts(CONFIG.celery.override)) == 0)
):
LOG.debug("Skipping celery config override; 'celery.override' field is empty.")
else:
LOG.info("Overriding default celery config.")
app.conf.update(**nested_namespace_to_dicts(CONFIG.celery.override))

# auto-discover tasks
app.autodiscover_tasks(["merlin.common"])

app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
task_publish_retry_policy={
"interval_start": 10,
"interval_step": 10,
"interval_max": 60,
},
redis_max_connections=100000,
)

# Set a timeout to acknowledge a task before it's available to grab
# again (default 24 hours).
app.conf.broker_transport_options = {
"visibility_timeout_seconds": CONFIG.celery.visibility_timeout_seconds,
"max_connections": 100,
}

app.conf.update(broker_pool_limit=0)

# Task routing: call our default queue merlin
app.conf.task_routes = (route_for_task,)
app.conf.task_default_queue = "merlin"

# Log formatting
app.conf.worker_log_color = True
app.conf.worker_log_format = FORMATS["DEFAULT"]
app.conf.worker_task_log_format = FORMATS["WORKER"]


@worker_process_init.connect()
def setup(**kwargs):
Expand Down
29 changes: 29 additions & 0 deletions merlin/config/celeryconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Default celery configuration for merlin
"""

from merlin.log_formatter import FORMATS


DICT = {
"task_serializer": "pickle",
ben-bay marked this conversation as resolved.
Show resolved Hide resolved
"accept_content": ["pickle"],
"result_serializer": "pickle",
"task_acks_late": True,
"task_reject_on_worker_lost": True,
"task_publish_retry_policy": {
"interval_start": 10,
"interval_step": 10,
"interval_max": 60,
},
"redis_max_connections": 100000,
"broker_transport_options": {
"visibility_timeout": 60 * 60 * 24,
"max_connections": 100,
},
"broker_pool_limit": 0,
"task_default_queue": "merlin",
"worker_log_color": True,
"worker_log_format": FORMATS["DEFAULT"],
"worker_task_log_format": FORMATS["WORKER"],
}
9 changes: 4 additions & 5 deletions merlin/config/configfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,20 @@ def get_config(path):
return config


def load_default_timeout(config):
seconds = 60 * 60 * 24
def load_default_celery(config):
try:
config["celery"]
except KeyError:
config["celery"] = {}
try:
config["celery"]["visibility_timeout_seconds"]
config["celery"]["override"]
except KeyError:
config["celery"]["visibility_timeout_seconds"] = seconds
config["celery"]["override"] = None


def load_defaults(config):
load_default_user_names(config)
load_default_timeout(config)
load_default_celery(config)


def is_debug():
Expand Down
5 changes: 4 additions & 1 deletion merlin/data/celery/app.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
celery:
ben-bay marked this conversation as resolved.
Show resolved Hide resolved
visibility_timeout_seconds: 86400
# see Celery configuration options
# https://docs.celeryproject.org/en/stable/userguide/configuration.html
override:
visibility_timeout: 86400

broker:
# can be redis, redis+sock, or rabbitmq
Expand Down
5 changes: 4 additions & 1 deletion merlin/data/celery/app_redis.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
celery:
visibility_timeout_seconds: 86400
# see Celery configuration options
# https://docs.celeryproject.org/en/stable/userguide/configuration.html
override:
visibility_timeout: 86400

broker:
# can be redis, redis+sock, or rabbitmq
Expand Down