Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Adding Jolokia authentication support #227

Merged
merged 1 commit into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/kafka_rolling_restart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The parameters specific for kafka-rolling-restart are:
terminate with an error. Default 600.
* ``--jolokia-port PORT``: The Jolokia port. Default 8778.
* ``--jolokia-prefix PREFIX``: The Jolokia prefix. Default "jolokia/".
* ``--jolokia-user USERNAME``: Jolokia username. Default "None".
* ``--jolokia-password PASSWORD``: Jolokia password. Default "None".
* ``--no-confirm``: If specified, the script will not ask for confirmation.
* ``--skip N``: Skip the first N servers. Useful to recover from a partial
rolling restart. Default 0.
Expand Down
53 changes: 50 additions & 3 deletions kafka_utils/kafka_rolling_restart/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
DEFAULT_TIME_LIMIT_SECS = 600
DEFAULT_JOLOKIA_PORT = 8778
DEFAULT_JOLOKIA_PREFIX = "jolokia/"
DEFAULT_JOLOKIA_USER = None
DEFAULT_JOLOKIA_PASSWORD = None
DEFAULT_STOP_COMMAND = "service kafka stop"
DEFAULT_START_COMMAND = "service kafka start"

Expand Down Expand Up @@ -117,6 +119,18 @@ def parse_opts():
type=str,
default=DEFAULT_JOLOKIA_PREFIX,
)
parser.add_argument(
'--jolokia-user',
help='the jolokia username. Default: %(default)s',
type=str,
default=DEFAULT_JOLOKIA_USER,
)
parser.add_argument(
'--jolokia-password',
help='the jolokia password. Default: %(default)s',
type=str,
default=DEFAULT_JOLOKIA_PASSWORD,
)
parser.add_argument(
'--no-confirm',
help='proceed without asking confirmation. Default: %(default)s',
Expand Down Expand Up @@ -195,7 +209,7 @@ def filter_broker_list(brokers, filter_by):
return [(id, host) for id, host in brokers if id in filter_by_set]


def generate_requests(hosts, jolokia_port, jolokia_prefix):
def generate_requests(hosts, jolokia_port, jolokia_prefix, jolokia_user, jolokia_password):
"""Return a generator of requests to fetch the under replicated
partition number from the specified hosts.

Expand All @@ -205,9 +219,15 @@ def generate_requests(hosts, jolokia_port, jolokia_prefix):
:type jolokia_port: integer
:param jolokia_prefix: HTTP prefix on the server for the Jolokia queries
:type jolokia_prefix: string
:param jolokia_user: Username for Jolokia, if needed
:type jolokia_user: string
:param jolokia_password: Password for Jolokia, if needed
:type jolokia_password: string
:returns: generator of requests
"""
session = FuturesSession()
if jolokia_user and jolokia_password:
session.auth = (jolokia_user, jolokia_password)
for host in hosts:
url = "http://{host}:{port}/{prefix}/read/{key}".format(
host=host,
Expand All @@ -218,7 +238,7 @@ def generate_requests(hosts, jolokia_port, jolokia_prefix):
yield host, session.get(url)


def read_cluster_status(hosts, jolokia_port, jolokia_prefix):
def read_cluster_status(hosts, jolokia_port, jolokia_prefix, jolokia_user, jolokia_password):
"""Read and return the number of under replicated partitions and
missing brokers from the specified hosts.

Expand All @@ -229,12 +249,19 @@ def read_cluster_status(hosts, jolokia_port, jolokia_prefix):
:param jolokia_prefix: HTTP prefix on the server for the Jolokia queries
:type jolokia_prefix: string
:returns: tuple of integers
:param jolokia_user: Username for Jolokia, if needed
:type jolokia_user: string
:param jolokia_password: Password for Jolokia, if needed
:type jolokia_password: string
"""
under_replicated = 0
missing_brokers = 0
for host, request in generate_requests(hosts, jolokia_port, jolokia_prefix):
for host, request in generate_requests(hosts, jolokia_port, jolokia_prefix, jolokia_user, jolokia_password):
try:
response = request.result()
if response.status_code == 401:
print("Jolokia Authentication Failed. Exiting.")
sys.exit(1)
if 400 <= response.status_code <= 599:
print("Got status code {0}. Exiting.".format(response.status_code))
sys.exit(1)
Expand Down Expand Up @@ -300,6 +327,8 @@ def wait_for_stable_cluster(
hosts,
jolokia_port,
jolokia_prefix,
jolokia_user,
jolokia_password,
check_interval,
check_count,
unhealthy_time_limit,
Expand All @@ -313,6 +342,10 @@ def wait_for_stable_cluster(
:type jolokia_port: integer
:param jolokia_prefix: HTTP prefix on the server for the Jolokia queries
:type jolokia_prefix: string
:param jolokia_user: Username for Jolokia, if needed
:type jolokia_user: string
:param jolokia_password: Password for Jolokia, if needed
:type jolokia_password: string
:param check_interval: the number of seconds it will wait between each check
:type check_interval: integer
:param check_count: the number of times the check should be positive before
Expand All @@ -329,6 +362,8 @@ def wait_for_stable_cluster(
hosts,
jolokia_port,
jolokia_prefix,
jolokia_user,
jolokia_password
)
if partitions or brokers:
stable_counter = 0
Expand Down Expand Up @@ -361,6 +396,8 @@ def execute_rolling_restart(
brokers,
jolokia_port,
jolokia_prefix,
jolokia_user,
jolokia_password,
check_interval,
check_count,
unhealthy_time_limit,
Expand All @@ -385,6 +422,10 @@ def execute_rolling_restart(
:type jolokia_port: integer
:param jolokia_prefix: HTTP prefix on the server for the Jolokia queries
:type jolokia_prefix: string
:param jolokia_user: Username for Jolokia, if needed
:type jolokia_user: string
:param jolokia_password: Password for Jolokia, if needed
:type jolokia_password: string
:param check_interval: the number of seconds it will wait between each check
:type check_interval: integer
:param check_count: the number of times the check should be positive before
Expand Down Expand Up @@ -417,6 +458,8 @@ def execute_rolling_restart(
all_hosts,
jolokia_port,
jolokia_prefix,
jolokia_user,
jolokia_password,
check_interval,
1 if n == 0 else check_count,
unhealthy_time_limit,
Expand All @@ -434,6 +477,8 @@ def execute_rolling_restart(
all_hosts,
jolokia_port,
jolokia_prefix,
jolokia_user,
jolokia_password,
check_interval,
check_count,
unhealthy_time_limit,
Expand Down Expand Up @@ -544,6 +589,8 @@ def run():
brokers,
opts.jolokia_port,
opts.jolokia_prefix,
opts.jolokia_user,
opts.jolokia_password,
opts.check_interval,
opts.check_count,
opts.unhealthy_time_limit,
Expand Down
12 changes: 6 additions & 6 deletions tests/kafka_rolling_restart/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_read_cluster_value_partitions(mock_get):
request = mock_get.return_value
request.result.return_value = response

p, b = main.read_cluster_status(["host1", "host2", "host3"], 80, "jolokia")
p, b = main.read_cluster_status(["host1", "host2", "host3"], 80, "jolokia", None, None)

assert p == 3 # 3 missing partitions
assert b == 0 # 0 missing brokers
Expand All @@ -44,7 +44,7 @@ def test_read_cluster_value_exit(mock_get):
request.result.return_value = response

with pytest.raises(SystemExit):
p, b = main.read_cluster_status(["host1"], 80, "jolokia")
p, b = main.read_cluster_status(["host1"], 80, "jolokia", None, None)


@mock.patch.object(main.FuturesSession, 'get', autospec=True)
Expand All @@ -55,7 +55,7 @@ def test_read_cluster_value_no_key(mock_get):
request = mock_get.return_value
request.result.return_value = response

p, b = main.read_cluster_status(["host1"], 80, "jolokia")
p, b = main.read_cluster_status(["host1"], 80, "jolokia", None, None)

assert p == 0 # 0 missing partitions
assert b == 1 # 1 missing brokers
Expand All @@ -66,7 +66,7 @@ def test_read_cluster_value_server_down(mock_get):
request = mock_get.return_value
request.result.side_effect = RequestException

p, b = main.read_cluster_status(["host1"], 80, "jolokia")
p, b = main.read_cluster_status(["host1"], 80, "jolokia", None, None)

assert p == 0 # 0 missing partitions
assert b == 1 # 1 missing brokers
Expand All @@ -87,7 +87,7 @@ def read_cluster_state_values(first_part, repeat):
)
@mock.patch('time.sleep', autospec=True)
def test_wait_for_stable_cluster_success(mock_sleep, mock_read):
main.wait_for_stable_cluster([], 1, "", 5, 3, 100)
main.wait_for_stable_cluster([], 1, "", None, None, 5, 3, 100)

assert mock_read.call_count == 6
assert mock_sleep.mock_calls == [mock.call(5)] * 5
Expand All @@ -102,7 +102,7 @@ def test_wait_for_stable_cluster_success(mock_sleep, mock_read):
@mock.patch('time.sleep', autospec=True)
def test_wait_for_stable_cluster_timeout(mock_sleep, mock_read):
with pytest.raises(main.WaitTimeoutException):
main.wait_for_stable_cluster([], 1, "", 5, 3, 100)
main.wait_for_stable_cluster([], 1, "", None, None, 5, 3, 100)

assert mock_read.call_count == 21
assert mock_sleep.mock_calls == [mock.call(5)] * 20