diff --git a/Dockerfile b/Dockerfile index 8fd4a933a..2715cdaee 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,4 +29,4 @@ USER flower VOLUME $FLOWER_DATA_DIR -ENTRYPOINT ["flower"] +CMD ["celery flower"] diff --git a/docker-compose.yml b/docker-compose.yml index 26ab86a0c..e70632e7d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ services: - redis flower: build: ./ - command: python -m flower -A tasks + command: celery -A tasks flower volumes: - ./examples:/data working_dir: /data @@ -40,3 +40,6 @@ services: environment: CELERY_BROKER_URL: redis://redis CELERY_RESULT_BACKEND: redis://redis + depends_on: + - worker + - redis diff --git a/docs/prometheus-integration.rst b/docs/prometheus-integration.rst index 3f9576128..7303869ff 100644 --- a/docs/prometheus-integration.rst +++ b/docs/prometheus-integration.rst @@ -144,7 +144,7 @@ Start Flower Monitoring In your Celery application folder run this command (Flower needs to be installed):: - celery flower -A tasks --broker=redis://localhost:6379/0 + celery -A tasks --broker=redis://localhost:6379/0 flower Configure and Start Prometheus ------------------------------ diff --git a/flower/command.py b/flower/command.py index ed5833c51..21d754a7b 100644 --- a/flower/command.py +++ b/flower/command.py @@ -9,6 +9,8 @@ from logging import NullHandler import click +from celery.utils.time import humanize_seconds +from kombu.exceptions import OperationalError from tornado.options import options from tornado.options import parse_command_line, parse_config_file from tornado.log import enable_pretty_logging @@ -48,7 +50,12 @@ def sigterm_handler(signal, frame): sys.exit(0) signal.signal(signal.SIGTERM, sigterm_handler) + + if not is_broker_connected(celery_app=app): + return + print_banner(app, 'ssl_options' in settings) + try: flower.start() except (KeyboardInterrupt, SystemExit): @@ -103,6 +110,33 @@ def warn_about_celery_args_used_in_flower_command(ctx, flower_args): ) +def is_broker_connected(celery_app): + is_connected = False + max_retries = celery_app.conf.broker_connection_max_retries + + if not celery_app.conf.broker_connection_retry: + max_retries = 0 + + with celery_app.connection_or_acquire() as conn: + broker_url = conn.as_uri() + + def _error_handler(exc, interval): + next_step = f"Trying again {humanize_seconds(interval, 'in', ' ')}... ({int(interval / 2)}/{max_retries})" + logger.error(f'Cannot connect to broker: {broker_url}. Error: {exc}. {next_step}') + + try: + conn.ensure_connection(errback=_error_handler, max_retries=max_retries) + logger.info(f'Established connection to broker: {broker_url}. Starting Flower...') + is_connected = True + except OperationalError as e: + logger.error( + f'Unable to establish connection to broker: : {broker_url}. Error: {e}. ' + f'Please make sure the broker is running when using Flower. Aborting Flower...' + ) + + return is_connected + + def setup_logging(): if options.debug and options.logging == 'info': options.logging = 'debug' diff --git a/tests/unit/test_command.py b/tests/unit/test_command.py index 117741f1c..817cfef86 100644 --- a/tests/unit/test_command.py +++ b/tests/unit/test_command.py @@ -3,11 +3,13 @@ import tempfile import unittest import subprocess -from unittest.mock import Mock, patch +from unittest.mock import Mock, patch, create_autospec, MagicMock import mock +from celery import Celery +from kombu.exceptions import OperationalError -from flower.command import apply_options, warn_about_celery_args_used_in_flower_command +from flower.command import apply_options, warn_about_celery_args_used_in_flower_command, is_broker_connected from tornado.options import options from tests.unit import AsyncHTTPTestCase @@ -77,6 +79,79 @@ class FakeContext: ) +class TestIsBrokerConnected(AsyncHTTPTestCase): + @patch('flower.command.logger.info') + def test_returns_true_and_logs_if_connection_to_broker_established(self, mock_info): + broker_url = 'broker_url' + broker_connection_max_retries = 2 + + mock_conf = Mock(broker_connection_retry=True, broker_connection_max_retries=broker_connection_max_retries) + + mock_connection = MagicMock(name='mock connection') + mock_connection.as_uri.return_value = broker_url + mock_connection.__enter__.return_value = mock_connection + + mock_celery_app = create_autospec(Celery, conf=mock_conf) + mock_celery_app.connection_or_acquire.return_value = mock_connection + + assert is_broker_connected(celery_app=mock_celery_app) + + mock_connection.ensure_connection.assert_called_once() + ensure_connection_kwargs = mock_connection.ensure_connection.call_args_list[0][1] + assert '_error_handler' in str(ensure_connection_kwargs['errback']) + assert ensure_connection_kwargs['max_retries'] == broker_connection_max_retries + + mock_info.assert_called_once_with(f'Established connection to broker: {broker_url}. Starting Flower...') + + @patch('flower.command.logger.error') + def test_returns_false_and_logs_error_if_connection_to_broker_cannot_be_established(self, mock_error): + broker_url = 'broker_url' + broker_connection_max_retries = 2 + + mock_conf = Mock(broker_connection_retry=True, broker_connection_max_retries=broker_connection_max_retries) + + mock_connection = MagicMock(name='mock connection') + mock_connection.as_uri.return_value = broker_url + error = OperationalError('test error') + mock_connection.ensure_connection.side_effect = error + mock_connection.__enter__.return_value = mock_connection + + mock_celery_app = create_autospec(Celery, conf=mock_conf) + mock_celery_app.connection_or_acquire.return_value = mock_connection + + assert not is_broker_connected(celery_app=mock_celery_app) + + mock_connection.ensure_connection.assert_called_once() + ensure_connection_kwargs = mock_connection.ensure_connection.call_args_list[0][1] + assert '_error_handler' in str(ensure_connection_kwargs['errback']) + assert ensure_connection_kwargs['max_retries'] == broker_connection_max_retries + + mock_error.assert_called_once_with( + f'Unable to establish connection to broker: : {broker_url}. Error: {error}. ' + f'Please make sure the broker is running when using Flower. Aborting Flower...' + ) + + def test_disabled_broker_connection_retry_sets_max_retries_to_zero(self): + broker_url = 'broker_url' + broker_connection_max_retries = 2 + + mock_conf = Mock(broker_connection_retry=False, broker_connection_max_retries=broker_connection_max_retries) + + mock_connection = MagicMock(name='mock connection') + mock_connection.as_uri.return_value = broker_url + mock_connection.__enter__.return_value = mock_connection + + mock_celery_app = create_autospec(Celery, conf=mock_conf) + mock_celery_app.connection_or_acquire.return_value = mock_connection + + assert is_broker_connected(celery_app=mock_celery_app) + + mock_connection.ensure_connection.assert_called_once() + ensure_connection_kwargs = mock_connection.ensure_connection.call_args_list[0][1] + assert '_error_handler' in str(ensure_connection_kwargs['errback']) + assert ensure_connection_kwargs['max_retries'] == 0 + + class TestConfOption(AsyncHTTPTestCase): def test_error_conf(self): with self.mock_option('conf', None):