diff --git a/docs/source/kafka_rolling_restart.rst b/docs/source/kafka_rolling_restart.rst index f943fcd9..4f255bd7 100644 --- a/docs/source/kafka_rolling_restart.rst +++ b/docs/source/kafka_rolling_restart.rst @@ -34,6 +34,8 @@ The parameters specific for kafka-rolling-restart are: terminate with an error. Default 600. * ``--jolokia-port PORT``: The Jolokia port. Default 8778. * ``--jolokia-prefix PREFIX``: The Jolokia prefix. Default "jolokia/". +* ``--jolokia-user USERNAME``: Jolokia username. Default "None". +* ``--jolokia-password PASSWORD``: Jolokia password. Default "None". * ``--no-confirm``: If specified, the script will not ask for confirmation. * ``--skip N``: Skip the first N servers. Useful to recover from a partial rolling restart. Default 0. diff --git a/kafka_utils/kafka_rolling_restart/main.py b/kafka_utils/kafka_rolling_restart/main.py index e1675f77..3f1caa42 100644 --- a/kafka_utils/kafka_rolling_restart/main.py +++ b/kafka_utils/kafka_rolling_restart/main.py @@ -47,6 +47,8 @@ DEFAULT_TIME_LIMIT_SECS = 600 DEFAULT_JOLOKIA_PORT = 8778 DEFAULT_JOLOKIA_PREFIX = "jolokia/" +DEFAULT_JOLOKIA_USER = None +DEFAULT_JOLOKIA_PASSWORD = None DEFAULT_STOP_COMMAND = "service kafka stop" DEFAULT_START_COMMAND = "service kafka start" @@ -117,6 +119,18 @@ def parse_opts(): type=str, default=DEFAULT_JOLOKIA_PREFIX, ) + parser.add_argument( + '--jolokia-user', + help='the jolokia username. Default: %(default)s', + type=str, + default=DEFAULT_JOLOKIA_USER, + ) + parser.add_argument( + '--jolokia-password', + help='the jolokia password. Default: %(default)s', + type=str, + default=DEFAULT_JOLOKIA_PASSWORD, + ) parser.add_argument( '--no-confirm', help='proceed without asking confirmation. Default: %(default)s', @@ -195,7 +209,7 @@ def filter_broker_list(brokers, filter_by): return [(id, host) for id, host in brokers if id in filter_by_set] -def generate_requests(hosts, jolokia_port, jolokia_prefix): +def generate_requests(hosts, jolokia_port, jolokia_prefix, jolokia_user, jolokia_password): """Return a generator of requests to fetch the under replicated partition number from the specified hosts. @@ -205,9 +219,15 @@ def generate_requests(hosts, jolokia_port, jolokia_prefix): :type jolokia_port: integer :param jolokia_prefix: HTTP prefix on the server for the Jolokia queries :type jolokia_prefix: string + :param jolokia_user: Username for Jolokia, if needed + :type jolokia_user: string + :param jolokia_password: Password for Jolokia, if needed + :type jolokia_password: string :returns: generator of requests """ session = FuturesSession() + if jolokia_user and jolokia_password: + session.auth = (jolokia_user, jolokia_password) for host in hosts: url = "http://{host}:{port}/{prefix}/read/{key}".format( host=host, @@ -218,7 +238,7 @@ def generate_requests(hosts, jolokia_port, jolokia_prefix): yield host, session.get(url) -def read_cluster_status(hosts, jolokia_port, jolokia_prefix): +def read_cluster_status(hosts, jolokia_port, jolokia_prefix, jolokia_user, jolokia_password): """Read and return the number of under replicated partitions and missing brokers from the specified hosts. @@ -229,12 +249,19 @@ def read_cluster_status(hosts, jolokia_port, jolokia_prefix): :param jolokia_prefix: HTTP prefix on the server for the Jolokia queries :type jolokia_prefix: string :returns: tuple of integers + :param jolokia_user: Username for Jolokia, if needed + :type jolokia_user: string + :param jolokia_password: Password for Jolokia, if needed + :type jolokia_password: string """ under_replicated = 0 missing_brokers = 0 - for host, request in generate_requests(hosts, jolokia_port, jolokia_prefix): + for host, request in generate_requests(hosts, jolokia_port, jolokia_prefix, jolokia_user, jolokia_password): try: response = request.result() + if response.status_code == 401: + print("Jolokia Authentication Failed. Exiting.") + sys.exit(1) if 400 <= response.status_code <= 599: print("Got status code {0}. Exiting.".format(response.status_code)) sys.exit(1) @@ -300,6 +327,8 @@ def wait_for_stable_cluster( hosts, jolokia_port, jolokia_prefix, + jolokia_user, + jolokia_password, check_interval, check_count, unhealthy_time_limit, @@ -313,6 +342,10 @@ def wait_for_stable_cluster( :type jolokia_port: integer :param jolokia_prefix: HTTP prefix on the server for the Jolokia queries :type jolokia_prefix: string + :param jolokia_user: Username for Jolokia, if needed + :type jolokia_user: string + :param jolokia_password: Password for Jolokia, if needed + :type jolokia_password: string :param check_interval: the number of seconds it will wait between each check :type check_interval: integer :param check_count: the number of times the check should be positive before @@ -329,6 +362,8 @@ def wait_for_stable_cluster( hosts, jolokia_port, jolokia_prefix, + jolokia_user, + jolokia_password ) if partitions or brokers: stable_counter = 0 @@ -361,6 +396,8 @@ def execute_rolling_restart( brokers, jolokia_port, jolokia_prefix, + jolokia_user, + jolokia_password, check_interval, check_count, unhealthy_time_limit, @@ -385,6 +422,10 @@ def execute_rolling_restart( :type jolokia_port: integer :param jolokia_prefix: HTTP prefix on the server for the Jolokia queries :type jolokia_prefix: string + :param jolokia_user: Username for Jolokia, if needed + :type jolokia_user: string + :param jolokia_password: Password for Jolokia, if needed + :type jolokia_password: string :param check_interval: the number of seconds it will wait between each check :type check_interval: integer :param check_count: the number of times the check should be positive before @@ -417,6 +458,8 @@ def execute_rolling_restart( all_hosts, jolokia_port, jolokia_prefix, + jolokia_user, + jolokia_password, check_interval, 1 if n == 0 else check_count, unhealthy_time_limit, @@ -434,6 +477,8 @@ def execute_rolling_restart( all_hosts, jolokia_port, jolokia_prefix, + jolokia_user, + jolokia_password, check_interval, check_count, unhealthy_time_limit, @@ -544,6 +589,8 @@ def run(): brokers, opts.jolokia_port, opts.jolokia_prefix, + opts.jolokia_user, + opts.jolokia_password, opts.check_interval, opts.check_count, opts.unhealthy_time_limit, diff --git a/tests/kafka_rolling_restart/test_main.py b/tests/kafka_rolling_restart/test_main.py index 1effd3f0..00149be0 100644 --- a/tests/kafka_rolling_restart/test_main.py +++ b/tests/kafka_rolling_restart/test_main.py @@ -30,7 +30,7 @@ def test_read_cluster_value_partitions(mock_get): request = mock_get.return_value request.result.return_value = response - p, b = main.read_cluster_status(["host1", "host2", "host3"], 80, "jolokia") + p, b = main.read_cluster_status(["host1", "host2", "host3"], 80, "jolokia", None, None) assert p == 3 # 3 missing partitions assert b == 0 # 0 missing brokers @@ -44,7 +44,7 @@ def test_read_cluster_value_exit(mock_get): request.result.return_value = response with pytest.raises(SystemExit): - p, b = main.read_cluster_status(["host1"], 80, "jolokia") + p, b = main.read_cluster_status(["host1"], 80, "jolokia", None, None) @mock.patch.object(main.FuturesSession, 'get', autospec=True) @@ -55,7 +55,7 @@ def test_read_cluster_value_no_key(mock_get): request = mock_get.return_value request.result.return_value = response - p, b = main.read_cluster_status(["host1"], 80, "jolokia") + p, b = main.read_cluster_status(["host1"], 80, "jolokia", None, None) assert p == 0 # 0 missing partitions assert b == 1 # 1 missing brokers @@ -66,7 +66,7 @@ def test_read_cluster_value_server_down(mock_get): request = mock_get.return_value request.result.side_effect = RequestException - p, b = main.read_cluster_status(["host1"], 80, "jolokia") + p, b = main.read_cluster_status(["host1"], 80, "jolokia", None, None) assert p == 0 # 0 missing partitions assert b == 1 # 1 missing brokers @@ -87,7 +87,7 @@ def read_cluster_state_values(first_part, repeat): ) @mock.patch('time.sleep', autospec=True) def test_wait_for_stable_cluster_success(mock_sleep, mock_read): - main.wait_for_stable_cluster([], 1, "", 5, 3, 100) + main.wait_for_stable_cluster([], 1, "", None, None, 5, 3, 100) assert mock_read.call_count == 6 assert mock_sleep.mock_calls == [mock.call(5)] * 5 @@ -102,7 +102,7 @@ def test_wait_for_stable_cluster_success(mock_sleep, mock_read): @mock.patch('time.sleep', autospec=True) def test_wait_for_stable_cluster_timeout(mock_sleep, mock_read): with pytest.raises(main.WaitTimeoutException): - main.wait_for_stable_cluster([], 1, "", 5, 3, 100) + main.wait_for_stable_cluster([], 1, "", None, None, 5, 3, 100) assert mock_read.call_count == 21 assert mock_sleep.mock_calls == [mock.call(5)] * 20