From f37a100777e96a5809018d3061838fc198e206b9 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Mon, 6 Jul 2020 12:11:25 -0700 Subject: [PATCH 01/19] added default app.yaml, still in progress... --- merlin/celery.py | 3 + merlin/config/app.yaml | 127 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 merlin/config/app.yaml diff --git a/merlin/celery.py b/merlin/celery.py index 51fab12da..a16000e5f 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -104,6 +104,9 @@ app.conf.update(broker_pool_limit=0) +# update all keys at once +# app.conf.update(...) + # Task routing: call our default queue merlin app.conf.task_routes = (route_for_task,) app.conf.task_default_queue = "merlin" diff --git a/merlin/config/app.yaml b/merlin/config/app.yaml new file mode 100644 index 000000000..d3ee73972 --- /dev/null +++ b/merlin/config/app.yaml @@ -0,0 +1,127 @@ +# See https://docs.celeryproject.org/en/stable/userguide/configuration.html + +task_default_queue: merlin +task_acks_late: True +task_reject_on_worker_lost: True +redis_max_connections: 100000 +broker_pool_limit: 0 +task_serializer: pickle +accept_content: ["pickle"] +result_serializer: pickle +task_publish_retry_policy : + interval_start: 10: + interval_step: 10: + interval_max: 60: +broker_transport_options: + visibility_timeout_seconds: 86400 + max_connections: 100 + + +enable_utc: True +imports: [] +include: [] +timezone: UTC +beat_max_loop_interval: 0 +beat_schedule: {} +beat_scheduler: celery.beat:PersistentScheduler +beat_schedule_filename: celerybeat-schedule +beat_sync_every: 0 +broker_url: amqp:// +broker_transport: +broker_connection_timeout: 4.0 +broker_connection_retry: True +broker_connection_max_retries: 100 +broker_failover_strategy: round-robin +broker_heartbeat: +broker_login_method: +broker_use_ssl: +cache_backend: +cache_backend_options: +cassandra_table: +cassandra_entry_ttl: +cassandra_keyspace: +cassandra_port: +cassandra_read_consistency: +cassandra_servers: +cassandra_write_consistency: +cassandra_options: +3_: +s3_access_key_id: +3_: +s3_secret_access_key: +3_: +s3_bucket: +3: +s3_base_path: +3: +s3_endpoint_url: +3_: +s3_region: +couchbase_backend_settings: +arangodb_backend_settings: +mongodb_backend_settings: +event_queue_expires: +event_queue_ttl: +event_queue_prefix: +event_serializer: +redis_db: +redis_host: +redis_password: +redis_port: +redis_backend_use_ssl: +result_backend: +result_cache_max: +result_compression: +result_exchange: +result_exchange_type: +result_expires: +result_persistent: +se result_backend instead.: +database_engine_options: +database_short_lived_sessions: +database_db_names: +security_certificate: +security_cert_store: +security_key: +task_acks_on_failure_or_timeout: True +task_always_eager: False +task_annotations: null +task_compression: null +task_create_missing_queues: True +task_default_delivery_mode: persistent +task_default_exchange: +task_default_exchange_type: direct +task_default_rate_limit: +task_default_routing_key: +task_eager_propagates: +task_ignore_result: +task_publish_retry: +task_queues: +task_routes: +task_send_sent_event: +task_soft_time_limit: +task_time_limit: +task_track_started: +worker_agent: +worker_autoscaler: +worker_concurrency: +worker_consumer: +worker_direct: +worker_disable_rate_limits: +worker_enable_remote_control: +worker_hijack_root_logger: +worker_log_color: +worker_log_format: +worker_lost_wait: +worker_max_tasks_per_child: +worker_pool: +worker_pool_putlocks: +worker_pool_restarts: +worker_prefetch_multiplier: +worker_redirect_stdouts: +worker_redirect_stdouts_level: +worker_send_task_events: +worker_state_db: +worker_task_log_format: +worker_timer: +worker_timer_precision: From 5725dbae182be82ed5d43b2a74add5005befc075 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Mon, 6 Jul 2020 12:29:28 -0700 Subject: [PATCH 02/19] added celery defaults --- merlin/celery.py | 5 + merlin/config/app.yaml | 265 +++++++++++++++++++++++------------------ 2 files changed, 156 insertions(+), 114 deletions(-) diff --git a/merlin/celery.py b/merlin/celery.py index a16000e5f..b89be00a7 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -104,6 +104,11 @@ app.conf.update(broker_pool_limit=0) +#with open("/g/g13/bay1/app.yaml", "w") as f: +# for k,v in app.conf.__dict__.items(): +# f.write(k) +# f.write(": " + str(v) + "\n") + # update all keys at once # app.conf.update(...) diff --git a/merlin/config/app.yaml b/merlin/config/app.yaml index d3ee73972..e03cbb598 100644 --- a/merlin/config/app.yaml +++ b/merlin/config/app.yaml @@ -1,127 +1,164 @@ -# See https://docs.celeryproject.org/en/stable/userguide/configuration.html - -task_default_queue: merlin -task_acks_late: True -task_reject_on_worker_lost: True -redis_max_connections: 100000 -broker_pool_limit: 0 -task_serializer: pickle -accept_content: ["pickle"] -result_serializer: pickle -task_publish_retry_policy : - interval_start: 10: - interval_step: 10: - interval_max: 60: -broker_transport_options: - visibility_timeout_seconds: 86400 - max_connections: 100 - - +accept_content: ['pickle'] +result_accept_content: null enable_utc: True imports: [] include: [] -timezone: UTC +timezone: null beat_max_loop_interval: 0 beat_schedule: {} -beat_scheduler: celery.beat:PersistentScheduler -beat_schedule_filename: celerybeat-schedule +beat_scheduler: 'celery.beat:PersistentScheduler' +beat_schedule_filename: 'celerybeat-schedule' beat_sync_every: 0 -broker_url: amqp:// -broker_transport: -broker_connection_timeout: 4.0 +broker_url: redacted +broker_read_url: null +broker_write_url: null +broker_transport: null +broker_transport_options: {visibility_timeout_seconds: 86400, max_connections: 100} +broker_connection_timeout: 4 broker_connection_retry: True broker_connection_max_retries: 100 -broker_failover_strategy: round-robin -broker_heartbeat: -broker_login_method: -broker_use_ssl: -cache_backend: -cache_backend_options: -cassandra_table: -cassandra_entry_ttl: -cassandra_keyspace: -cassandra_port: -cassandra_read_consistency: -cassandra_servers: -cassandra_write_consistency: -cassandra_options: -3_: -s3_access_key_id: -3_: -s3_secret_access_key: -3_: -s3_bucket: -3: -s3_base_path: -3: -s3_endpoint_url: -3_: -s3_region: -couchbase_backend_settings: -arangodb_backend_settings: -mongodb_backend_settings: -event_queue_expires: -event_queue_ttl: -event_queue_prefix: -event_serializer: -redis_db: -redis_host: -redis_password: -redis_port: -redis_backend_use_ssl: -result_backend: -result_cache_max: -result_compression: -result_exchange: -result_exchange_type: -result_expires: -result_persistent: -se result_backend instead.: -database_engine_options: -database_short_lived_sessions: -database_db_names: -security_certificate: -security_cert_store: -security_key: +broker_failover_strategy: null +broker_heartbeat: 120 +broker_heartbeat_checkrate: 3.0 +broker_login_method: null +broker_pool_limit: 0 +broker_use_ssl: True +broker_host: null +broker_port: null +broker_user: null +broker_password: null +broker_vhost: null +cache_backend: null +cache_backend_options: {} +cassandra_entry_ttl: null +cassandra_keyspace: null +cassandra_port: null +cassandra_read_consistency: null +cassandra_servers: null +cassandra_table: null +cassandra_write_consistency: null +cassandra_auth_provider: null +cassandra_auth_kwargs: null +cassandra_options: {} +s3_access_key_id: null +s3_secret_access_key: null +s3_bucket: null +s3_base_path: null +s3_endpoint_url: null +s3_region: null +azureblockblob_container_name: 'celery' +azureblockblob_retry_initial_backoff_sec: 2 +azureblockblob_retry_increment_base: 2 +azureblockblob_retry_max_attempts: 3 +control_queue_ttl: 300.0 +control_queue_expires: 10.0 +control_exchange: 'celery' +couchbase_backend_settings: null +arangodb_backend_settings: null +mongodb_backend_settings: null +cosmosdbsql_database_name: 'celerydb' +cosmosdbsql_collection_name: 'celerycol' +cosmosdbsql_consistency_level: 'Session' +cosmosdbsql_max_retry_attempts: 9 +cosmosdbsql_max_retry_wait_time: 30 +event_queue_expires: 60.0 +event_queue_ttl: 5.0 +event_queue_prefix: 'celeryev' +event_serializer: 'json' +event_exchange: 'celeryev' +redis_backend_use_ssl: null +redis_db: null +redis_host: null +redis_max_connections: 100000 +redis_password: null +redis_port: null +redis_socket_timeout: 120.0 +redis_socket_connect_timeout: null +redis_retry_on_timeout: False +redis_socket_keepalive: False +result_backend: redacted +result_cache_max: -1 +result_compression: null +result_exchange: 'celeryresults' +result_exchange_type: 'direct' +result_expires: datetime.timedelta(days=1) +result_persistent: null +result_extended: False +result_serializer: 'pickle' +result_backend_transport_options: {} +result_chord_retry_interval: 1.0 +result_chord_join_timeout: 3.0 +result_backend_max_sleep_between_retries_ms: 10000 +result_backend_max_retries: inf +result_backend_base_sleep_between_retries_ms: 10 +result_backend_always_retry: False +elasticsearch_retry_on_timeout: null +elasticsearch_max_retries: null +elasticsearch_timeout: null +elasticsearch_save_meta_as_text: True +riak_backend_settings: null +security_certificate: null +security_cert_store: null +security_key: null +security_digest: 'sha256' +database_url: null +database_engine_options: null +database_short_lived_sessions: False +database_table_schemas: null +database_table_names: null +task_acks_late: True task_acks_on_failure_or_timeout: True task_always_eager: False task_annotations: null task_compression: null task_create_missing_queues: True -task_default_delivery_mode: persistent -task_default_exchange: -task_default_exchange_type: direct -task_default_rate_limit: -task_default_routing_key: -task_eager_propagates: -task_ignore_result: -task_publish_retry: -task_queues: -task_routes: -task_send_sent_event: -task_soft_time_limit: -task_time_limit: -task_track_started: -worker_agent: -worker_autoscaler: -worker_concurrency: -worker_consumer: -worker_direct: -worker_disable_rate_limits: -worker_enable_remote_control: -worker_hijack_root_logger: -worker_log_color: -worker_log_format: -worker_lost_wait: -worker_max_tasks_per_child: -worker_pool: -worker_pool_putlocks: -worker_pool_restarts: -worker_prefetch_multiplier: -worker_redirect_stdouts: -worker_redirect_stdouts_level: -worker_send_task_events: -worker_state_db: -worker_task_log_format: -worker_timer: -worker_timer_precision: +task_inherit_parent_priority: False +task_default_delivery_mode: 2 +task_default_queue: 'celery' +task_default_exchange: null +task_default_exchange_type: 'direct' +task_default_routing_key: null +task_default_rate_limit: null +task_default_priority: null +task_eager_propagates: False +task_ignore_result: False +task_protocol: 2 +task_publish_retry: True +task_publish_retry_policy: {interval_start: 10, interval_step: 10, interval_max: 60} +task_queues: null +task_queue_ha_policy: null +task_queue_max_priority: null +task_reject_on_worker_lost: True +task_remote_tracebacks: False +task_routes: null +task_send_sent_event: False +task_serializer: 'pickle' +task_soft_time_limit: null +task_time_limit: null +task_store_errors_even_if_ignored: False +task_track_started: False +worker_agent: null +worker_autoscaler: 'celery.worker.autoscale:Autoscaler' +worker_concurrency: 0 +worker_consumer: 'celery.worker.consumer:Consumer' +worker_direct: False +worker_disable_rate_limits: False +worker_enable_remote_control: True +worker_hijack_root_logger: True +worker_log_color: null +worker_log_format: '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s' +worker_lost_wait: 10.0 +worker_max_memory_per_child: null +worker_max_tasks_per_child: null +worker_pool: 'prefork' +worker_pool_putlocks: True +worker_pool_restarts: False +worker_proc_alive_timeout: 4.0 +worker_prefetch_multiplier: 4 +worker_redirect_stdouts: True +worker_redirect_stdouts_level: 'WARNING' +worker_send_task_events: False +worker_state_db: null +worker_task_log_format: '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s' +worker_timer: null +worker_timer_precision: 1.0 From 6295c75065026edcd0c33eade7e7d4d541090536 Mon Sep 17 00:00:00 2001 From: Benjamin Bay <48391872+ben-bay@users.noreply.github.com> Date: Mon, 6 Jul 2020 13:32:36 -0600 Subject: [PATCH 03/19] Update app.yaml --- merlin/config/app.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/merlin/config/app.yaml b/merlin/config/app.yaml index e03cbb598..fecdd39bf 100644 --- a/merlin/config/app.yaml +++ b/merlin/config/app.yaml @@ -1,3 +1,5 @@ +# See https://docs.celeryproject.org/en/stable/userguide/configuration.html + accept_content: ['pickle'] result_accept_content: null enable_utc: True From b5b82e4eda7cbb6dbc0f8d70b5be88ae6044ce94 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Mon, 6 Jul 2020 16:56:37 -0700 Subject: [PATCH 04/19] config override appears to be working --- merlin/celery.py | 64 ++++--------- merlin/config/app.yaml | 164 ---------------------------------- merlin/config/celeryconfig.py | 20 +++++ 3 files changed, 37 insertions(+), 211 deletions(-) delete mode 100644 merlin/config/app.yaml create mode 100644 merlin/config/celeryconfig.py diff --git a/merlin/celery.py b/merlin/celery.py index b89be00a7..f417e20bb 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -40,10 +40,11 @@ from celery.signals import worker_process_init import merlin.common.security.encrypt_backend_traffic -from merlin.config import broker, results_backend +from merlin.config import broker, results_backend, celeryconfig from merlin.config.configfile import CONFIG from merlin.log_formatter import FORMATS from merlin.router import route_for_task +from merlin.utils import nested_namespace_to_dicts LOG = logging.getLogger(__name__) @@ -68,59 +69,28 @@ BROKER_URI = None RESULTS_BACKEND_URI = None +app = Celery("merlin") -app = Celery( - "merlin", - broker=BROKER_URI, - backend=RESULTS_BACKEND_URI, - broker_use_ssl=broker_ssl, - redis_backend_use_ssl=results_ssl, -) +# load merlin config defaults +app.config_from_object(celeryconfig) +# load config overrides from app.yaml +app.conf.update(**nested_namespace_to_dicts(CONFIG.celery.override)) +# overwrite config with essential properties app.conf.update( - task_serializer="pickle", accept_content=["pickle"], result_serializer="pickle" + broker = BROKER_URI, + backend = RESULTS_BACKEND_URI, + broker_use_ssl = broker_ssl, + redis_backend_use_ssl = results_ssl, + task_routes = (route_for_task,), + task_default_queue = "merlin", + worker_log_color = True, + worker_log_format = FORMATS["DEFAULT"], + worker_task_log_format = FORMATS["WORKER"], ) - app.autodiscover_tasks(["merlin.common"]) -app.conf.update( - task_acks_late=True, - task_reject_on_worker_lost=True, - task_publish_retry_policy={ - "interval_start": 10, - "interval_step": 10, - "interval_max": 60, - }, - redis_max_connections=100000, -) - -# Set a timeout to acknowledge a task before it's available to grab -# again (default 24 hours). -app.conf.broker_transport_options = { - "visibility_timeout_seconds": CONFIG.celery.visibility_timeout_seconds, - "max_connections": 100, -} - -app.conf.update(broker_pool_limit=0) - -#with open("/g/g13/bay1/app.yaml", "w") as f: -# for k,v in app.conf.__dict__.items(): -# f.write(k) -# f.write(": " + str(v) + "\n") - -# update all keys at once -# app.conf.update(...) - -# Task routing: call our default queue merlin -app.conf.task_routes = (route_for_task,) -app.conf.task_default_queue = "merlin" - -# Log formatting -app.conf.worker_log_color = True -app.conf.worker_log_format = FORMATS["DEFAULT"] -app.conf.worker_task_log_format = FORMATS["WORKER"] - @worker_process_init.connect() def setup(**kwargs): diff --git a/merlin/config/app.yaml b/merlin/config/app.yaml deleted file mode 100644 index e03cbb598..000000000 --- a/merlin/config/app.yaml +++ /dev/null @@ -1,164 +0,0 @@ -accept_content: ['pickle'] -result_accept_content: null -enable_utc: True -imports: [] -include: [] -timezone: null -beat_max_loop_interval: 0 -beat_schedule: {} -beat_scheduler: 'celery.beat:PersistentScheduler' -beat_schedule_filename: 'celerybeat-schedule' -beat_sync_every: 0 -broker_url: redacted -broker_read_url: null -broker_write_url: null -broker_transport: null -broker_transport_options: {visibility_timeout_seconds: 86400, max_connections: 100} -broker_connection_timeout: 4 -broker_connection_retry: True -broker_connection_max_retries: 100 -broker_failover_strategy: null -broker_heartbeat: 120 -broker_heartbeat_checkrate: 3.0 -broker_login_method: null -broker_pool_limit: 0 -broker_use_ssl: True -broker_host: null -broker_port: null -broker_user: null -broker_password: null -broker_vhost: null -cache_backend: null -cache_backend_options: {} -cassandra_entry_ttl: null -cassandra_keyspace: null -cassandra_port: null -cassandra_read_consistency: null -cassandra_servers: null -cassandra_table: null -cassandra_write_consistency: null -cassandra_auth_provider: null -cassandra_auth_kwargs: null -cassandra_options: {} -s3_access_key_id: null -s3_secret_access_key: null -s3_bucket: null -s3_base_path: null -s3_endpoint_url: null -s3_region: null -azureblockblob_container_name: 'celery' -azureblockblob_retry_initial_backoff_sec: 2 -azureblockblob_retry_increment_base: 2 -azureblockblob_retry_max_attempts: 3 -control_queue_ttl: 300.0 -control_queue_expires: 10.0 -control_exchange: 'celery' -couchbase_backend_settings: null -arangodb_backend_settings: null -mongodb_backend_settings: null -cosmosdbsql_database_name: 'celerydb' -cosmosdbsql_collection_name: 'celerycol' -cosmosdbsql_consistency_level: 'Session' -cosmosdbsql_max_retry_attempts: 9 -cosmosdbsql_max_retry_wait_time: 30 -event_queue_expires: 60.0 -event_queue_ttl: 5.0 -event_queue_prefix: 'celeryev' -event_serializer: 'json' -event_exchange: 'celeryev' -redis_backend_use_ssl: null -redis_db: null -redis_host: null -redis_max_connections: 100000 -redis_password: null -redis_port: null -redis_socket_timeout: 120.0 -redis_socket_connect_timeout: null -redis_retry_on_timeout: False -redis_socket_keepalive: False -result_backend: redacted -result_cache_max: -1 -result_compression: null -result_exchange: 'celeryresults' -result_exchange_type: 'direct' -result_expires: datetime.timedelta(days=1) -result_persistent: null -result_extended: False -result_serializer: 'pickle' -result_backend_transport_options: {} -result_chord_retry_interval: 1.0 -result_chord_join_timeout: 3.0 -result_backend_max_sleep_between_retries_ms: 10000 -result_backend_max_retries: inf -result_backend_base_sleep_between_retries_ms: 10 -result_backend_always_retry: False -elasticsearch_retry_on_timeout: null -elasticsearch_max_retries: null -elasticsearch_timeout: null -elasticsearch_save_meta_as_text: True -riak_backend_settings: null -security_certificate: null -security_cert_store: null -security_key: null -security_digest: 'sha256' -database_url: null -database_engine_options: null -database_short_lived_sessions: False -database_table_schemas: null -database_table_names: null -task_acks_late: True -task_acks_on_failure_or_timeout: True -task_always_eager: False -task_annotations: null -task_compression: null -task_create_missing_queues: True -task_inherit_parent_priority: False -task_default_delivery_mode: 2 -task_default_queue: 'celery' -task_default_exchange: null -task_default_exchange_type: 'direct' -task_default_routing_key: null -task_default_rate_limit: null -task_default_priority: null -task_eager_propagates: False -task_ignore_result: False -task_protocol: 2 -task_publish_retry: True -task_publish_retry_policy: {interval_start: 10, interval_step: 10, interval_max: 60} -task_queues: null -task_queue_ha_policy: null -task_queue_max_priority: null -task_reject_on_worker_lost: True -task_remote_tracebacks: False -task_routes: null -task_send_sent_event: False -task_serializer: 'pickle' -task_soft_time_limit: null -task_time_limit: null -task_store_errors_even_if_ignored: False -task_track_started: False -worker_agent: null -worker_autoscaler: 'celery.worker.autoscale:Autoscaler' -worker_concurrency: 0 -worker_consumer: 'celery.worker.consumer:Consumer' -worker_direct: False -worker_disable_rate_limits: False -worker_enable_remote_control: True -worker_hijack_root_logger: True -worker_log_color: null -worker_log_format: '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s' -worker_lost_wait: 10.0 -worker_max_memory_per_child: null -worker_max_tasks_per_child: null -worker_pool: 'prefork' -worker_pool_putlocks: True -worker_pool_restarts: False -worker_proc_alive_timeout: 4.0 -worker_prefetch_multiplier: 4 -worker_redirect_stdouts: True -worker_redirect_stdouts_level: 'WARNING' -worker_send_task_events: False -worker_state_db: null -worker_task_log_format: '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s' -worker_timer: null -worker_timer_precision: 1.0 diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py new file mode 100644 index 000000000..87eebee84 --- /dev/null +++ b/merlin/config/celeryconfig.py @@ -0,0 +1,20 @@ +""" +Default celery configuration for merlin +""" + +task_serializer = "pickle" +accept_content = ["pickle"] +result_serializer = "pickle" +task_acks_late = True +task_reject_on_worker_lost = True +task_publish_retry_policy = { + "interval_start": 10, + "interval_step": 10, + "interval_max": 60, +} +redis_max_connections = 100000 +broker_transport_options = { + "visibility_timeout_seconds": 60 * 60 * 24, + "max_connections": 100, +} +broker_pool_limit = 0 From d2531668f2fec55db399577c62314f20b5c4406c Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Mon, 6 Jul 2020 16:59:38 -0700 Subject: [PATCH 05/19] corrected value name --- merlin/config/celeryconfig.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py index 87eebee84..108a13fa1 100644 --- a/merlin/config/celeryconfig.py +++ b/merlin/config/celeryconfig.py @@ -14,7 +14,7 @@ } redis_max_connections = 100000 broker_transport_options = { - "visibility_timeout_seconds": 60 * 60 * 24, + "visibility_timeout": 60 * 60 * 24, "max_connections": 100, } broker_pool_limit = 0 From 9b3ee15c36dcd2eb4b5c4111905ef2a7defe9caf Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 08:46:10 -0700 Subject: [PATCH 06/19] shifted some properties to be optional defaults --- merlin/celery.py | 5 ----- merlin/config/celeryconfig.py | 7 +++++++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/merlin/celery.py b/merlin/celery.py index f417e20bb..03c1a71e5 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -42,7 +42,6 @@ import merlin.common.security.encrypt_backend_traffic from merlin.config import broker, results_backend, celeryconfig from merlin.config.configfile import CONFIG -from merlin.log_formatter import FORMATS from merlin.router import route_for_task from merlin.utils import nested_namespace_to_dicts @@ -84,10 +83,6 @@ broker_use_ssl = broker_ssl, redis_backend_use_ssl = results_ssl, task_routes = (route_for_task,), - task_default_queue = "merlin", - worker_log_color = True, - worker_log_format = FORMATS["DEFAULT"], - worker_task_log_format = FORMATS["WORKER"], ) app.autodiscover_tasks(["merlin.common"]) diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py index 108a13fa1..23934f84f 100644 --- a/merlin/config/celeryconfig.py +++ b/merlin/config/celeryconfig.py @@ -2,6 +2,9 @@ Default celery configuration for merlin """ +from merlin.log_formatter import FORMATS + + task_serializer = "pickle" accept_content = ["pickle"] result_serializer = "pickle" @@ -18,3 +21,7 @@ "max_connections": 100, } broker_pool_limit = 0 +task_default_queue = "merlin", +worker_log_color = True, +worker_log_format = FORMATS["DEFAULT"], +worker_task_log_format = FORMATS["WORKER"], From 80231ab8dc97d9c8b3165cf2920e08c2e8017347 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 08:47:07 -0700 Subject: [PATCH 07/19] removed unneeded file --- merlin/config/app.yaml | 166 ----------------------------------------- 1 file changed, 166 deletions(-) delete mode 100644 merlin/config/app.yaml diff --git a/merlin/config/app.yaml b/merlin/config/app.yaml deleted file mode 100644 index fecdd39bf..000000000 --- a/merlin/config/app.yaml +++ /dev/null @@ -1,166 +0,0 @@ -# See https://docs.celeryproject.org/en/stable/userguide/configuration.html - -accept_content: ['pickle'] -result_accept_content: null -enable_utc: True -imports: [] -include: [] -timezone: null -beat_max_loop_interval: 0 -beat_schedule: {} -beat_scheduler: 'celery.beat:PersistentScheduler' -beat_schedule_filename: 'celerybeat-schedule' -beat_sync_every: 0 -broker_url: redacted -broker_read_url: null -broker_write_url: null -broker_transport: null -broker_transport_options: {visibility_timeout_seconds: 86400, max_connections: 100} -broker_connection_timeout: 4 -broker_connection_retry: True -broker_connection_max_retries: 100 -broker_failover_strategy: null -broker_heartbeat: 120 -broker_heartbeat_checkrate: 3.0 -broker_login_method: null -broker_pool_limit: 0 -broker_use_ssl: True -broker_host: null -broker_port: null -broker_user: null -broker_password: null -broker_vhost: null -cache_backend: null -cache_backend_options: {} -cassandra_entry_ttl: null -cassandra_keyspace: null -cassandra_port: null -cassandra_read_consistency: null -cassandra_servers: null -cassandra_table: null -cassandra_write_consistency: null -cassandra_auth_provider: null -cassandra_auth_kwargs: null -cassandra_options: {} -s3_access_key_id: null -s3_secret_access_key: null -s3_bucket: null -s3_base_path: null -s3_endpoint_url: null -s3_region: null -azureblockblob_container_name: 'celery' -azureblockblob_retry_initial_backoff_sec: 2 -azureblockblob_retry_increment_base: 2 -azureblockblob_retry_max_attempts: 3 -control_queue_ttl: 300.0 -control_queue_expires: 10.0 -control_exchange: 'celery' -couchbase_backend_settings: null -arangodb_backend_settings: null -mongodb_backend_settings: null -cosmosdbsql_database_name: 'celerydb' -cosmosdbsql_collection_name: 'celerycol' -cosmosdbsql_consistency_level: 'Session' -cosmosdbsql_max_retry_attempts: 9 -cosmosdbsql_max_retry_wait_time: 30 -event_queue_expires: 60.0 -event_queue_ttl: 5.0 -event_queue_prefix: 'celeryev' -event_serializer: 'json' -event_exchange: 'celeryev' -redis_backend_use_ssl: null -redis_db: null -redis_host: null -redis_max_connections: 100000 -redis_password: null -redis_port: null -redis_socket_timeout: 120.0 -redis_socket_connect_timeout: null -redis_retry_on_timeout: False -redis_socket_keepalive: False -result_backend: redacted -result_cache_max: -1 -result_compression: null -result_exchange: 'celeryresults' -result_exchange_type: 'direct' -result_expires: datetime.timedelta(days=1) -result_persistent: null -result_extended: False -result_serializer: 'pickle' -result_backend_transport_options: {} -result_chord_retry_interval: 1.0 -result_chord_join_timeout: 3.0 -result_backend_max_sleep_between_retries_ms: 10000 -result_backend_max_retries: inf -result_backend_base_sleep_between_retries_ms: 10 -result_backend_always_retry: False -elasticsearch_retry_on_timeout: null -elasticsearch_max_retries: null -elasticsearch_timeout: null -elasticsearch_save_meta_as_text: True -riak_backend_settings: null -security_certificate: null -security_cert_store: null -security_key: null -security_digest: 'sha256' -database_url: null -database_engine_options: null -database_short_lived_sessions: False -database_table_schemas: null -database_table_names: null -task_acks_late: True -task_acks_on_failure_or_timeout: True -task_always_eager: False -task_annotations: null -task_compression: null -task_create_missing_queues: True -task_inherit_parent_priority: False -task_default_delivery_mode: 2 -task_default_queue: 'celery' -task_default_exchange: null -task_default_exchange_type: 'direct' -task_default_routing_key: null -task_default_rate_limit: null -task_default_priority: null -task_eager_propagates: False -task_ignore_result: False -task_protocol: 2 -task_publish_retry: True -task_publish_retry_policy: {interval_start: 10, interval_step: 10, interval_max: 60} -task_queues: null -task_queue_ha_policy: null -task_queue_max_priority: null -task_reject_on_worker_lost: True -task_remote_tracebacks: False -task_routes: null -task_send_sent_event: False -task_serializer: 'pickle' -task_soft_time_limit: null -task_time_limit: null -task_store_errors_even_if_ignored: False -task_track_started: False -worker_agent: null -worker_autoscaler: 'celery.worker.autoscale:Autoscaler' -worker_concurrency: 0 -worker_consumer: 'celery.worker.consumer:Consumer' -worker_direct: False -worker_disable_rate_limits: False -worker_enable_remote_control: True -worker_hijack_root_logger: True -worker_log_color: null -worker_log_format: '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s' -worker_lost_wait: 10.0 -worker_max_memory_per_child: null -worker_max_tasks_per_child: null -worker_pool: 'prefork' -worker_pool_putlocks: True -worker_pool_restarts: False -worker_proc_alive_timeout: 4.0 -worker_prefetch_multiplier: 4 -worker_redirect_stdouts: True -worker_redirect_stdouts_level: 'WARNING' -worker_send_task_events: False -worker_state_db: null -worker_task_log_format: '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s' -worker_timer: null -worker_timer_precision: 1.0 From fefae5d29a46aa8262ca49c290563587131f160b Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 09:12:04 -0700 Subject: [PATCH 08/19] improvements --- merlin/celery.py | 14 +++++++++----- merlin/config/configfile.py | 9 ++++----- merlin/data/celery/app.yaml | 5 ++++- merlin/data/celery/app_redis.yaml | 5 ++++- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/merlin/celery.py b/merlin/celery.py index 03c1a71e5..b0a5e5b1c 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -54,15 +54,15 @@ results_ssl = False try: BROKER_URI = broker.get_connection_string() - LOG.info(f"broker: {broker.get_connection_string(include_password=False)}") + LOG.debug(f"broker: {broker.get_connection_string(include_password=False)}") broker_ssl = broker.get_ssl_config() - LOG.info(f"broker_ssl = {broker_ssl}") + LOG.debug(f"broker_ssl = {broker_ssl}") RESULTS_BACKEND_URI = results_backend.get_connection_string() results_ssl = results_backend.get_ssl_config(celery_check=True) - LOG.info( + LOG.debug( f"results: {results_backend.get_connection_string(include_password=False)}" ) - LOG.info(f"results: redis_backed_use_ssl = {results_ssl}") + LOG.debug(f"results: redis_backed_use_ssl = {results_ssl}") except ValueError: # These variables won't be set if running with '--local'. BROKER_URI = None @@ -74,7 +74,11 @@ app.config_from_object(celeryconfig) # load config overrides from app.yaml -app.conf.update(**nested_namespace_to_dicts(CONFIG.celery.override)) +if (not hasattr(CONFIG.celery, "override")) or (CONFIG.celery.override is None) or (len(nested_namespace_to_dicts(CONFIG.celery.override)) == 0): + LOG.info("Skipping celery config override; 'celery.override' field is empty.") +else: + LOG.info("Overriding default celery config.") + app.conf.update(**nested_namespace_to_dicts(CONFIG.celery.override)) # overwrite config with essential properties app.conf.update( diff --git a/merlin/config/configfile.py b/merlin/config/configfile.py index 313025591..fab793cec 100644 --- a/merlin/config/configfile.py +++ b/merlin/config/configfile.py @@ -129,21 +129,20 @@ def get_config(path): return config -def load_default_timeout(config): - seconds = 60 * 60 * 24 +def load_default_celery(config): try: config["celery"] except KeyError: config["celery"] = {} try: - config["celery"]["visibility_timeout_seconds"] + config["celery"]["override"] except KeyError: - config["celery"]["visibility_timeout_seconds"] = seconds + config["celery"]["override"] = None def load_defaults(config): load_default_user_names(config) - load_default_timeout(config) + load_default_celery(config) def is_debug(): diff --git a/merlin/data/celery/app.yaml b/merlin/data/celery/app.yaml index e4631b75a..3b37c09fb 100644 --- a/merlin/data/celery/app.yaml +++ b/merlin/data/celery/app.yaml @@ -1,5 +1,8 @@ celery: - visibility_timeout_seconds: 86400 + # see Celery configuration options + # https://docs.celeryproject.org/en/stable/userguide/configuration.html + override: + visibility_timeout: 86400 broker: # can be redis, redis+sock, or rabbitmq diff --git a/merlin/data/celery/app_redis.yaml b/merlin/data/celery/app_redis.yaml index ecee7d398..1b08313ad 100644 --- a/merlin/data/celery/app_redis.yaml +++ b/merlin/data/celery/app_redis.yaml @@ -1,5 +1,8 @@ celery: - visibility_timeout_seconds: 86400 + # see Celery configuration options + # https://docs.celeryproject.org/en/stable/userguide/configuration.html + override: + visibility_timeout: 86400 broker: # can be redis, redis+sock, or rabbitmq From a0d6b690d915d6e40be952a834319ebc67ffa7c0 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 09:13:03 -0700 Subject: [PATCH 09/19] fixed style --- merlin/celery.py | 18 +++++++++++------- merlin/config/celeryconfig.py | 8 ++++---- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/merlin/celery.py b/merlin/celery.py index b0a5e5b1c..6372d4e38 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -40,7 +40,7 @@ from celery.signals import worker_process_init import merlin.common.security.encrypt_backend_traffic -from merlin.config import broker, results_backend, celeryconfig +from merlin.config import broker, celeryconfig, results_backend from merlin.config.configfile import CONFIG from merlin.router import route_for_task from merlin.utils import nested_namespace_to_dicts @@ -74,7 +74,11 @@ app.config_from_object(celeryconfig) # load config overrides from app.yaml -if (not hasattr(CONFIG.celery, "override")) or (CONFIG.celery.override is None) or (len(nested_namespace_to_dicts(CONFIG.celery.override)) == 0): +if ( + (not hasattr(CONFIG.celery, "override")) + or (CONFIG.celery.override is None) + or (len(nested_namespace_to_dicts(CONFIG.celery.override)) == 0) +): LOG.info("Skipping celery config override; 'celery.override' field is empty.") else: LOG.info("Overriding default celery config.") @@ -82,11 +86,11 @@ # overwrite config with essential properties app.conf.update( - broker = BROKER_URI, - backend = RESULTS_BACKEND_URI, - broker_use_ssl = broker_ssl, - redis_backend_use_ssl = results_ssl, - task_routes = (route_for_task,), + broker=BROKER_URI, + backend=RESULTS_BACKEND_URI, + broker_use_ssl=broker_ssl, + redis_backend_use_ssl=results_ssl, + task_routes=(route_for_task,), ) app.autodiscover_tasks(["merlin.common"]) diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py index 23934f84f..80ecc8bcb 100644 --- a/merlin/config/celeryconfig.py +++ b/merlin/config/celeryconfig.py @@ -21,7 +21,7 @@ "max_connections": 100, } broker_pool_limit = 0 -task_default_queue = "merlin", -worker_log_color = True, -worker_log_format = FORMATS["DEFAULT"], -worker_task_log_format = FORMATS["WORKER"], +task_default_queue = ("merlin",) +worker_log_color = (True,) +worker_log_format = (FORMATS["DEFAULT"],) +worker_task_log_format = (FORMATS["WORKER"],) From 4f8cf997156fd7e8ca9c22b6ad3b76f75218e566 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 09:25:50 -0700 Subject: [PATCH 10/19] fixed style --- merlin/celery.py | 2 +- merlin/config/celeryconfig.py | 40 ++++++++++++++++++----------------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/merlin/celery.py b/merlin/celery.py index 6372d4e38..f8fa3f68b 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -71,7 +71,7 @@ app = Celery("merlin") # load merlin config defaults -app.config_from_object(celeryconfig) +app.conf.update(**celeryconfig.DICT) # load config overrides from app.yaml if ( diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py index 80ecc8bcb..14da1b022 100644 --- a/merlin/config/celeryconfig.py +++ b/merlin/config/celeryconfig.py @@ -5,23 +5,25 @@ from merlin.log_formatter import FORMATS -task_serializer = "pickle" -accept_content = ["pickle"] -result_serializer = "pickle" -task_acks_late = True -task_reject_on_worker_lost = True -task_publish_retry_policy = { - "interval_start": 10, - "interval_step": 10, - "interval_max": 60, +DICT = { + "task_serializer": "pickle", + "accept_content": ["pickle"], + "result_serializer": "pickle", + "task_acks_late": True, + "task_reject_on_worker_lost": True, + "task_publish_retry_policy": { + "interval_start": 10, + "interval_step": 10, + "interval_max": 60, + }, + "redis_max_connections": 100000, + "broker_transport_options": { + "visibility_timeout": 60 * 60 * 24, + "max_connections": 100, + }, + "broker_pool_limit": 0, + "task_default_queue": ("merlin",), + "worker_log_color": (True,), + "worker_log_format": (FORMATS["DEFAULT"],), + "worker_task_log_format": (FORMATS["WORKER"],), } -redis_max_connections = 100000 -broker_transport_options = { - "visibility_timeout": 60 * 60 * 24, - "max_connections": 100, -} -broker_pool_limit = 0 -task_default_queue = ("merlin",) -worker_log_color = (True,) -worker_log_format = (FORMATS["DEFAULT"],) -worker_task_log_format = (FORMATS["WORKER"],) From dee7f217c167fd4480041118946d2cbe3d64e49d Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 10:25:32 -0700 Subject: [PATCH 11/19] hid 'no override' message --- merlin/celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/celery.py b/merlin/celery.py index f8fa3f68b..604fc5e51 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -79,7 +79,7 @@ or (CONFIG.celery.override is None) or (len(nested_namespace_to_dicts(CONFIG.celery.override)) == 0) ): - LOG.info("Skipping celery config override; 'celery.override' field is empty.") + LOG.debug("Skipping celery config override; 'celery.override' field is empty.") else: LOG.info("Overriding default celery config.") app.conf.update(**nested_namespace_to_dicts(CONFIG.celery.override)) From ae84e2f139c40036804c069c8218ce23b971f008 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 14:27:28 -0700 Subject: [PATCH 12/19] tweaked var names --- merlin/celery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/merlin/celery.py b/merlin/celery.py index 604fc5e51..068b0505e 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -86,8 +86,8 @@ # overwrite config with essential properties app.conf.update( - broker=BROKER_URI, - backend=RESULTS_BACKEND_URI, + broker_url=BROKER_URI, + result_backend=RESULTS_BACKEND_URI, broker_use_ssl=broker_ssl, redis_backend_use_ssl=results_ssl, task_routes=(route_for_task,), From fcafb62d22d63559d9c17b4baf3187211f6ecdfb Mon Sep 17 00:00:00 2001 From: Benjamin Bay <48391872+ben-bay@users.noreply.github.com> Date: Tue, 7 Jul 2020 15:28:26 -0600 Subject: [PATCH 13/19] added 2 tests for celery app (#246) --- tests/test_celery.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 tests/test_celery.py diff --git a/tests/test_celery.py b/tests/test_celery.py new file mode 100644 index 000000000..e39fac194 --- /dev/null +++ b/tests/test_celery.py @@ -0,0 +1,17 @@ +import re + +def test_broker_url(): + """ + Ensure the celery application 'broker_url' roughly matches the required pattern. + """ + from merlin.celery import app + assert re.match(r"amqps:\/\/\w+:.+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url) + + +def test_result_backend(): + """ + Ensure the celery application 'result_backend' roughly matches the required pattern. + """ + from merlin.celery import app + assert re.match(r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url) + From c94572a65c52ea97d51b4d708ca96ead11ebdeb8 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 14:32:56 -0700 Subject: [PATCH 14/19] fixed test --- tests/test_celery.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_celery.py b/tests/test_celery.py index e39fac194..d724833c9 100644 --- a/tests/test_celery.py +++ b/tests/test_celery.py @@ -1,10 +1,10 @@ import re +from merlin.celery import app def test_broker_url(): """ Ensure the celery application 'broker_url' roughly matches the required pattern. """ - from merlin.celery import app assert re.match(r"amqps:\/\/\w+:.+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url) @@ -12,6 +12,5 @@ def test_result_backend(): """ Ensure the celery application 'result_backend' roughly matches the required pattern. """ - from merlin.celery import app - assert re.match(r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url) + assert re.match(r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.result_backend) From 79aadc5a16bee20320c88fee4038c07079d587ef Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Tue, 7 Jul 2020 14:54:07 -0700 Subject: [PATCH 15/19] fixed workers bug --- CHANGELOG.md | 1 + merlin/celery.py | 19 ++++++++++--------- merlin/config/celeryconfig.py | 8 ++++---- tests/test_celery.py | 11 ++++++++--- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 23571ffbf..ae244cfde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - The ability to override any value of the celery configuration thru `app.yaml` in `celery.override`. +- 2 unit tests for the format of broker url and result backend. ## [1.6.2] diff --git a/merlin/celery.py b/merlin/celery.py index 068b0505e..ecad3dfea 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -68,7 +68,15 @@ BROKER_URI = None RESULTS_BACKEND_URI = None -app = Celery("merlin") +# initialize app with essential properties +app = Celery( + "merlin", + broker=BROKER_URI, + backend=RESULTS_BACKEND_URI, + broker_use_ssl=broker_ssl, + redis_backend_use_ssl=results_ssl, + task_routes=(route_for_task,), +) # load merlin config defaults app.conf.update(**celeryconfig.DICT) @@ -84,14 +92,7 @@ LOG.info("Overriding default celery config.") app.conf.update(**nested_namespace_to_dicts(CONFIG.celery.override)) -# overwrite config with essential properties -app.conf.update( - broker_url=BROKER_URI, - result_backend=RESULTS_BACKEND_URI, - broker_use_ssl=broker_ssl, - redis_backend_use_ssl=results_ssl, - task_routes=(route_for_task,), -) +# auto-discover tasks app.autodiscover_tasks(["merlin.common"]) diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py index 14da1b022..bf58602d5 100644 --- a/merlin/config/celeryconfig.py +++ b/merlin/config/celeryconfig.py @@ -22,8 +22,8 @@ "max_connections": 100, }, "broker_pool_limit": 0, - "task_default_queue": ("merlin",), - "worker_log_color": (True,), - "worker_log_format": (FORMATS["DEFAULT"],), - "worker_task_log_format": (FORMATS["WORKER"],), + "task_default_queue": "merlin", + "worker_log_color": True, + "worker_log_format": FORMATS["DEFAULT"], + "worker_task_log_format": FORMATS["WORKER"], } diff --git a/tests/test_celery.py b/tests/test_celery.py index d724833c9..f4056fb40 100644 --- a/tests/test_celery.py +++ b/tests/test_celery.py @@ -1,16 +1,21 @@ import re + from merlin.celery import app + def test_broker_url(): """ Ensure the celery application 'broker_url' roughly matches the required pattern. """ - assert re.match(r"amqps:\/\/\w+:.+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url) + assert re.match( + r"amqps:\/\/\w+:.+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url + ) def test_result_backend(): """ Ensure the celery application 'result_backend' roughly matches the required pattern. """ - assert re.match(r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.result_backend) - + assert re.match( + r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.result_backend + ) From 2265af263cc9d67cda867e7a1064b40c7fd7d5e3 Mon Sep 17 00:00:00 2001 From: Bay Date: Wed, 8 Jul 2020 12:34:44 -0600 Subject: [PATCH 16/19] fixed unit tests for running offline --- tests/test_celery.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/test_celery.py b/tests/test_celery.py index f4056fb40..b2af0d249 100644 --- a/tests/test_celery.py +++ b/tests/test_celery.py @@ -7,15 +7,17 @@ def test_broker_url(): """ Ensure the celery application 'broker_url' roughly matches the required pattern. """ - assert re.match( - r"amqps:\/\/\w+:.+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url - ) + if app.conf.broker_url: + assert re.match( + r"amqps:\/\/\w+:.+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url + ) def test_result_backend(): """ Ensure the celery application 'result_backend' roughly matches the required pattern. """ - assert re.match( - r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.result_backend - ) + if app.conf.broker_url: + assert re.match( + r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.result_backend + ) From a3435d19cac273db80f0221a5b49eb5eb4066b88 Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Thu, 9 Jul 2020 10:18:35 -0700 Subject: [PATCH 17/19] removed 2 unit tests --- tests/test_celery.py | 23 ----------------------- 1 file changed, 23 deletions(-) delete mode 100644 tests/test_celery.py diff --git a/tests/test_celery.py b/tests/test_celery.py deleted file mode 100644 index b2af0d249..000000000 --- a/tests/test_celery.py +++ /dev/null @@ -1,23 +0,0 @@ -import re - -from merlin.celery import app - - -def test_broker_url(): - """ - Ensure the celery application 'broker_url' roughly matches the required pattern. - """ - if app.conf.broker_url: - assert re.match( - r"amqps:\/\/\w+:.+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.broker_url - ) - - -def test_result_backend(): - """ - Ensure the celery application 'result_backend' roughly matches the required pattern. - """ - if app.conf.broker_url: - assert re.match( - r"redis:\/\/\w+:\w+@jackalope\.llnl\.gov:\d+\/\w+", app.conf.result_backend - ) From 7c7218811ad9d1ca544e02ca41130465e5d3af8e Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Thu, 9 Jul 2020 11:01:09 -0700 Subject: [PATCH 18/19] fixed CHANGELOG --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ae244cfde..23571ffbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - The ability to override any value of the celery configuration thru `app.yaml` in `celery.override`. -- 2 unit tests for the format of broker url and result backend. ## [1.6.2] From 2f57466659435a7eb45005e4602bfc34210f66ae Mon Sep 17 00:00:00 2001 From: Benjamin Bay Date: Wed, 22 Jul 2020 11:41:39 -0700 Subject: [PATCH 19/19] added info print of all celery configurations being overridden --- merlin/celery.py | 16 +++++++-- .../workflows/flux/scripts/flux_info.py | 36 +++++++++++-------- merlin/spec/expansion.py | 1 + merlin/spec/specification.py | 5 ++- tests/integration/run_tests.py | 4 ++- 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/merlin/celery.py b/merlin/celery.py index ecad3dfea..72f047792 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -89,8 +89,20 @@ ): LOG.debug("Skipping celery config override; 'celery.override' field is empty.") else: - LOG.info("Overriding default celery config.") - app.conf.update(**nested_namespace_to_dicts(CONFIG.celery.override)) + override_dict = nested_namespace_to_dicts(CONFIG.celery.override) + override_str = "" + i = 0 + for k, v in override_dict.items(): + if k not in str(app.conf.__dict__): + raise ValueError(f"'{k}' is not a celery configuration.") + override_str += f"\t{k}:\t{v}" + if i != len(override_dict) - 1: + override_str += "\n" + i += 1 + LOG.info( + f"Overriding default celery config with 'celery.override' in 'app.yaml':\n{override_str}" + ) + app.conf.update(**override_dict) # auto-discover tasks app.autodiscover_tasks(["merlin.common"]) diff --git a/merlin/examples/workflows/flux/scripts/flux_info.py b/merlin/examples/workflows/flux/scripts/flux_info.py index 5995bd127..a2e1efe9b 100755 --- a/merlin/examples/workflows/flux/scripts/flux_info.py +++ b/merlin/examples/workflows/flux/scripts/flux_info.py @@ -60,42 +60,50 @@ qwall = "{0}.walltime".format(fdir) wall_time = kvs.get(f, qwall) - print(f"Job {d[0]}: create: {create_time} start {start_time} run {start_time} completing {completing_time} complete {complete_time} wall {wall_time}") + print( + f"Job {d[0]}: create: {create_time} start {start_time} run {start_time} completing {completing_time} complete {complete_time} wall {wall_time}" + ) except BaseException: pass -except : - top_dir="job" +except: + top_dir = "job" def get_data_dict(key): - kwargs = {"env": os.environ, "shell": True, "universal_newlines":True, "stdout":subprocess.PIPE,"stderr":subprocess.PIPE} + kwargs = { + "env": os.environ, + "shell": True, + "universal_newlines": True, + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + } flux_com = f"flux kvs get {key}" p = subprocess.Popen(flux_com, **kwargs) stdout, stderr = p.communicate() - + data = {} - for l in stdout.split('/n'): + for l in stdout.split("/n"): for s in l.strip().split(): if "timestamp" in s: - jstring = s.replace("'", "\"") + jstring = s.replace("'", '"') d = json.loads(jstring) - data[d['name']] = d['timestamp'] - - return data + data[d["name"]] = d["timestamp"] + return data for d in kvs.walk(top_dir, flux_handle=f): if "exec" in d[0]: for e in d[2]: - key = ".".join([top_dir,d[0],e]) + key = ".".join([top_dir, d[0], e]) - # This is currently not working gives + # This is currently not working gives # json.decoder.JSONDecodeError # data = kvs.get(f, key) data = get_data_dict(key) - print(f"Job {d[0]}: init: {data['init']} start {data['shell.start']} complete {data['complete']} done {data['done']} ") - + print( + f"Job {d[0]}: init: {data['init']} start {data['shell.start']} complete {data['complete']} done {data['done']} " + ) # vi: ts=4 sw=4 expandtab diff --git a/merlin/spec/expansion.py b/merlin/spec/expansion.py index 33f15ea25..012cb546e 100644 --- a/merlin/spec/expansion.py +++ b/merlin/spec/expansion.py @@ -114,6 +114,7 @@ def expand_env_vars(spec): for values with the key 'cmd' or 'restart' (these are executable shell scripts, so environment variable expansion would be redundant). """ + def recurse(section): if section is None: return section diff --git a/merlin/spec/specification.py b/merlin/spec/specification.py index 68c0888aa..d7baa1c53 100644 --- a/merlin/spec/specification.py +++ b/merlin/spec/specification.py @@ -245,7 +245,10 @@ def dict_to_yaml(obj, string, key_stack, newline=True): key_stack.append("elem") if use_hyphens: string += ( - (lvl + 1) * tab + "- " + str(dict_to_yaml(elem, "", key_stack)) + "\n" + (lvl + 1) * tab + + "- " + + str(dict_to_yaml(elem, "", key_stack)) + + "\n" ) else: string += str( diff --git a/tests/integration/run_tests.py b/tests/integration/run_tests.py index 1f8964c1a..09b17a1e7 100644 --- a/tests/integration/run_tests.py +++ b/tests/integration/run_tests.py @@ -654,7 +654,9 @@ def setup_argparse(): parser.add_argument( "--verbose", action="store_true", help="Flag for more detailed output messages" ) - parser.add_argument("--local", action="store_true", default=None, help="Run only local tests") + parser.add_argument( + "--local", action="store_true", default=None, help="Run only local tests" + ) parser.add_argument( "--ids", action="store",