Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate no comms in docker #1113

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ USER flower

VOLUME $FLOWER_DATA_DIR

ENTRYPOINT ["flower"]
CMD ["celery flower"]
5 changes: 4 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ services:
- redis
flower:
build: ./
command: python -m flower -A tasks
command: celery -A tasks flower
volumes:
- ./examples:/data
working_dir: /data
Expand All @@ -40,3 +40,6 @@ services:
environment:
CELERY_BROKER_URL: redis://redis
CELERY_RESULT_BACKEND: redis://redis
depends_on:
- worker
- redis
2 changes: 1 addition & 1 deletion docs/prometheus-integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ Start Flower Monitoring

In your Celery application folder run this command (Flower needs to be installed)::

celery flower -A tasks --broker=redis://localhost:6379/0
celery -A tasks --broker=redis://localhost:6379/0 flower

Configure and Start Prometheus
------------------------------
Expand Down
34 changes: 34 additions & 0 deletions flower/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from logging import NullHandler

import click
from celery.utils.time import humanize_seconds
from kombu.exceptions import OperationalError
from tornado.options import options
from tornado.options import parse_command_line, parse_config_file
from tornado.log import enable_pretty_logging
Expand Down Expand Up @@ -48,7 +50,12 @@ def sigterm_handler(signal, frame):
sys.exit(0)

signal.signal(signal.SIGTERM, sigterm_handler)

if not is_broker_connected(celery_app=app):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here in theory the connection is checked but can drop before we call flower.start().

The chances are super small though.
I will think about better way of doing this but for now it is good enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at kombu code:

    def ensure_connection(self, *args, **kwargs):
        """Public interface of _ensure_connection for retro-compatibility.

        Returns kombu.Connection instance.
        """
        self._ensure_connection(*args, **kwargs)
        return self

and they follow similar pattern (check connection and then return value) so I think we should be fine with making a check for connection and then calling flower.start()

return

print_banner(app, 'ssl_options' in settings)

try:
flower.start()
except (KeyboardInterrupt, SystemExit):
Expand Down Expand Up @@ -103,6 +110,33 @@ def warn_about_celery_args_used_in_flower_command(ctx, flower_args):
)


def is_broker_connected(celery_app):
is_connected = False
max_retries = celery_app.conf.broker_connection_max_retries

if not celery_app.conf.broker_connection_retry:
max_retries = 0

with celery_app.connection_or_acquire() as conn:
broker_url = conn.as_uri()

def _error_handler(exc, interval):
next_step = f"Trying again {humanize_seconds(interval, 'in', ' ')}... ({int(interval / 2)}/{max_retries})"
logger.error(f'Cannot connect to broker: {broker_url}. Error: {exc}. {next_step}')

try:
conn.ensure_connection(errback=_error_handler, max_retries=max_retries)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.info(f'Established connection to broker: {broker_url}. Starting Flower...')
is_connected = True
except OperationalError as e:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to find if kombu can raise other errors and add them here.

logger.error(
f'Unable to establish connection to broker: : {broker_url}. Error: {e}. '
f'Please make sure the broker is running when using Flower. Aborting Flower...'
)

return is_connected


def setup_logging():
if options.debug and options.logging == 'info':
options.logging = 'debug'
Expand Down
79 changes: 77 additions & 2 deletions tests/unit/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import tempfile
import unittest
import subprocess
from unittest.mock import Mock, patch
from unittest.mock import Mock, patch, create_autospec, MagicMock

import mock
from celery import Celery
from kombu.exceptions import OperationalError

from flower.command import apply_options, warn_about_celery_args_used_in_flower_command
from flower.command import apply_options, warn_about_celery_args_used_in_flower_command, is_broker_connected
from tornado.options import options
from tests.unit import AsyncHTTPTestCase

Expand Down Expand Up @@ -77,6 +79,79 @@ class FakeContext:
)


class TestIsBrokerConnected(AsyncHTTPTestCase):
@patch('flower.command.logger.info')
def test_returns_true_and_logs_if_connection_to_broker_established(self, mock_info):
broker_url = 'broker_url'
broker_connection_max_retries = 2

mock_conf = Mock(broker_connection_retry=True, broker_connection_max_retries=broker_connection_max_retries)

mock_connection = MagicMock(name='mock connection')
mock_connection.as_uri.return_value = broker_url
mock_connection.__enter__.return_value = mock_connection

mock_celery_app = create_autospec(Celery, conf=mock_conf)
mock_celery_app.connection_or_acquire.return_value = mock_connection

assert is_broker_connected(celery_app=mock_celery_app)

mock_connection.ensure_connection.assert_called_once()
ensure_connection_kwargs = mock_connection.ensure_connection.call_args_list[0][1]
assert '_error_handler' in str(ensure_connection_kwargs['errback'])
assert ensure_connection_kwargs['max_retries'] == broker_connection_max_retries

mock_info.assert_called_once_with(f'Established connection to broker: {broker_url}. Starting Flower...')

@patch('flower.command.logger.error')
def test_returns_false_and_logs_error_if_connection_to_broker_cannot_be_established(self, mock_error):
broker_url = 'broker_url'
broker_connection_max_retries = 2

mock_conf = Mock(broker_connection_retry=True, broker_connection_max_retries=broker_connection_max_retries)

mock_connection = MagicMock(name='mock connection')
mock_connection.as_uri.return_value = broker_url
error = OperationalError('test error')
mock_connection.ensure_connection.side_effect = error
mock_connection.__enter__.return_value = mock_connection

mock_celery_app = create_autospec(Celery, conf=mock_conf)
mock_celery_app.connection_or_acquire.return_value = mock_connection

assert not is_broker_connected(celery_app=mock_celery_app)

mock_connection.ensure_connection.assert_called_once()
ensure_connection_kwargs = mock_connection.ensure_connection.call_args_list[0][1]
assert '_error_handler' in str(ensure_connection_kwargs['errback'])
assert ensure_connection_kwargs['max_retries'] == broker_connection_max_retries

mock_error.assert_called_once_with(
f'Unable to establish connection to broker: : {broker_url}. Error: {error}. '
f'Please make sure the broker is running when using Flower. Aborting Flower...'
)

def test_disabled_broker_connection_retry_sets_max_retries_to_zero(self):
broker_url = 'broker_url'
broker_connection_max_retries = 2

mock_conf = Mock(broker_connection_retry=False, broker_connection_max_retries=broker_connection_max_retries)

mock_connection = MagicMock(name='mock connection')
mock_connection.as_uri.return_value = broker_url
mock_connection.__enter__.return_value = mock_connection

mock_celery_app = create_autospec(Celery, conf=mock_conf)
mock_celery_app.connection_or_acquire.return_value = mock_connection

assert is_broker_connected(celery_app=mock_celery_app)

mock_connection.ensure_connection.assert_called_once()
ensure_connection_kwargs = mock_connection.ensure_connection.call_args_list[0][1]
assert '_error_handler' in str(ensure_connection_kwargs['errback'])
assert ensure_connection_kwargs['max_retries'] == 0


class TestConfOption(AsyncHTTPTestCase):
def test_error_conf(self):
with self.mock_option('conf', None):
Expand Down