Skip to content

Commit

Permalink
Address review comments and improve auth
Browse files Browse the repository at this point in the history
Auth now chooses between cert (#1) and bearer token (#2)
instead of sending both. Also renamed ssl --> tls for correctness
  • Loading branch information
hkaj committed Jan 25, 2017
1 parent a35be92 commit c5bc507
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 82 deletions.
21 changes: 12 additions & 9 deletions conf.d/kubernetes.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,32 @@ instances:
# kubelet port. It needs to be set if you are not using a default one (10250 or 10255)
# kubelet_port: 10255
#
# SSL configuration for querying kubelet.
# TLS configuration for querying kubelet.
#
# Available options are:
# - simple SSL validation (with readily available certificates)
# - disabling SSL validation
# (server auth)
# - simple TLS validation (with readily available certificates)
# - disabling TLS validation
# - providing the server certificate
# (client auth)
# - providing a client cert/key pair
# - using a service account bearer token
#
# The default is to try and use the read-only port that doesn't require SSL
# And to fall back to the HTTPS API with simple SSL validation.
# The default is to try and use the read-only port that doesn't require TLS
# And to fall back to the HTTPS API with simple TLS validation.
#
# Explicitly disabling ssl_verify should be used with caution:
# Explicitly disabling tls_verify should be used with caution:
# if an attacker sniffs the agent requests they will see the agent's service account bearer token.
#
# Providing a server cert enables SSL validation.
# Providing a server cert enables TLS validation.
#
# It is possible to providing a client cert/key pair for client auth.
# The agent also uses its pod bearer token (if available) when querying kubelet and the apiserver.
# If no such cert is passed, the agent authenticates using its pod bearer token (if available) when querying kubelet and the apiserver.
#
# The recommended way to pass certificates and keys to the agent is by using
# "Secret" Kubernetes object (and to mount them as volumes).
#
# kubelet_ssl_verify: True
# kubelet_tls_verify: True
# kubelet_cert: /path/to/ca.pem
# kubelet_client_crt: /path/to/client.crt
# kubelet_client_key: /path/to/client.key
Expand Down
60 changes: 27 additions & 33 deletions tests/checks/mock/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,21 +385,22 @@ class TestKubeutil(unittest.TestCase):
def setUp(self, _locate_kubelet):
self.kubeutil = KubeUtil()

def test_init_ssl_settings(self):
@mock.patch('os.path.exists', return_value=True)
def test_init_tls_settings(self, *args):
instances = [
# (instance, expected_result)
({}, {'verify': True}),
({'kubelet_ssl_verify': False}, {'verify': False}),
({'kubelet_ssl_verify': True}, {'verify': True}),
({'kubelet_ssl_verify': 'foo.pem'}, {'verify': 'foo.pem'}),
({'kubelet_tls_verify': False}, {'verify': False}),
({'kubelet_tls_verify': True}, {'verify': True}),
({'kubelet_tls_verify': 'foo.pem'}, {'verify': 'foo.pem'}),
({'kubelet_cert': 'foo.pem'}, {'verify': 'foo.pem'}),
({'kubelet_client_crt': 'client.crt', 'kubelet_client_key': 'client.key'},
{'verify': True, 'kubelet_client_cert': ('client.crt', 'client.key')}),
({'kubelet_ssl_verify': True, 'kubelet_client_crt': 'client.crt'}, {'verify': True}),
({'kubelet_tls_verify': True, 'kubelet_client_crt': 'client.crt'}, {'verify': True}),
({'kubelet_client_crt': 'client.crt'}, {'verify': True})
]
for instance, result in instances:
self.assertEqual(self.kubeutil._init_ssl_settings(instance), result)
self.assertEqual(self.kubeutil._init_tls_settings(instance), result)


##### Test _locate_kubelet #####
Expand All @@ -419,7 +420,7 @@ def test_locate_kubelet_no_auth_no_ssl(self, _get_hostname):
({'kubelet_port': '1337'}, 'http://test_docker_host:1337'),
({'host': 'test_explicit_host', 'kubelet_port': '1337'}, 'http://test_explicit_host:1337')
]
with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_kubelet_url', return_value=True):
with mock.patch('utils.kubernetes.kubeutil.KubeUtil.perform_kubelet_query', return_value=True):
for instance, result in no_auth_no_ssl_instances:
self.assertEqual(self.kubeutil._locate_kubelet(instance), result)

Expand All @@ -434,13 +435,13 @@ def test_locate_kubelet_no_auth_no_verify(self, _get_hostname):
]

def side_effect(url):
"""Mock KubeUtil.retrieve_kubelet_url"""
"""Mock KubeUtil.perform_kubelet_query"""
if url.startswith('https://'):
return True
else:
raise Exception()

with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_kubelet_url', side_effect=side_effect):
with mock.patch('utils.kubernetes.kubeutil.KubeUtil.perform_kubelet_query', side_effect=side_effect):
for instance, result in no_auth_no_verify_instances:
self.assertEqual(self.kubeutil._locate_kubelet(instance), result)

Expand All @@ -449,10 +450,10 @@ def side_effect(url):
@mock.patch('utils.kubernetes.kubeutil.KubeUtil.get_auth_token', return_value='foo')
def test_locate_kubelet_verify_and_auth(self, *args):
"""
Test kubelet connection with SSL. Also look for auth token.
Test kubelet connection with TLS. Also look for auth token.
"""
no_auth_instances = [
# instance, ssl_settings, expected_result
# instance, tls_settings, expected_result
({}, {'verify': True}, 'https://test_k8s_host:10250'),
({'kubelet_port': '1337'}, {'verify': 'test.pem'}, 'https://test_k8s_host:1337'),
(
Expand All @@ -468,46 +469,39 @@ def test_locate_kubelet_verify_and_auth(self, *args):
]

def side_effect(url, **kwargs):
"""Mock KubeUtil.retrieve_kubelet_url"""
"""Mock KubeUtil.perform_kubelet_query"""
if url.startswith('https://') and '10255' not in url:
return True
else:
raise Exception()

# no auth / SSL with verify
for instance, ssl_settings, result in no_auth_instances:
# no auth / TLS with verify
for instance, tls_settings, result in no_auth_instances:
with mock.patch('utils.kubernetes.kubeutil.requests') as req:
req.get = mock.MagicMock(side_effect=side_effect)
self.kubeutil.ssl_settings = ssl_settings
self.kubeutil.tls_settings = tls_settings
self.assertEqual(self.kubeutil._locate_kubelet(instance), result)
req.get.assert_called_with(result + '/healthz', # test endpoint
timeout=10,
verify=ssl_settings.get('verify', False),
cert=ssl_settings.get('kubelet_client_cert'),
headers={'Authorization': 'Bearer foo'}, # auth
verify=tls_settings.get('verify', False),
headers={'Authorization': 'Bearer foo'} if 'kubelet_client_cert' not in tls_settings else None,
cert=tls_settings.get('kubelet_client_cert'),
params={'verbose': True}
)

@mock.patch('utils.kubernetes.kubeutil.KubeUtil.get_auth_token', return_value='foo')
def test_get_node_hostname(self, _get_auth_tkn):
node_lists = [
(json.loads(Fixtures.read_file('filtered_node_list_1_4.json', string_escape=False)), 'ip-10-0-0-179'),
({'items': [{'foo': 'bar'}]}, None)
]

exception_node_lists = [
{'items': []},
{'items': [{'foo': 'bar'}, {'bar': 'foo'}]}
({'items': [{'foo': 'bar'}]}, None),
({'items': []}, None),
({'items': [{'foo': 'bar'}, {'bar': 'foo'}]}, None)
]

for node_list, expected_result in node_lists:
with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_json_auth', return_value=node_list):
self.assertEqual(self.kubeutil.get_node_hostname('ip-10-0-0-179'), expected_result)

for node_list in exception_node_lists:
with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_json_auth', return_value=node_list):
self.assertRaises(Exception, self.kubeutil.get_node_hostname, 'ip-10-0-0-179')

@mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=['foo'])
@mock.patch('utils.kubernetes.KubeUtil.extract_kube_labels')
def test_get_kube_labels(self, extract_kube_labels, retrieve_pods_list):
Expand Down Expand Up @@ -538,7 +532,7 @@ def test_extract_kube_labels(self):
labels = set(inn for out in res.values() for inn in out)
self.assertEqual(len(labels), 3)

@mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_kubelet_url')
@mock.patch('utils.kubernetes.kubeutil.KubeUtil.perform_kubelet_query')
def test_retrieve_pods_list(self, retrieve_url):
self.kubeutil.retrieve_pods_list()
retrieve_url.assert_called_twice_with(self.kubeutil.pods_list_url, verbose=True, timeout=10)
Expand All @@ -555,7 +549,7 @@ def test_retrieve_metrics(self, retrieve_json):

@mock.patch('utils.kubernetes.kubeutil.KubeUtil.get_auth_token', return_value='foo')
@mock.patch('utils.kubernetes.kubeutil.requests')
def test_retrieve_kubelet_url(self, req, _get_auth_tkn):
def test_perform_kubelet_query(self, req, _get_auth_tkn):
base_params = {'timeout': 10, 'verify': False,
'params': {'verbose': True}, 'cert': None, 'headers': None}

Expand All @@ -570,12 +564,12 @@ def test_retrieve_kubelet_url(self, req, _get_auth_tkn):
('https://test.com', {'verify': True}, dict(base_params.items() + verify_true.items() + auth_token_header.items())),
('https://test.com', {'verify': 'kubelet.pem'}, dict(base_params.items() + verify_cert.items() + auth_token_header.items())),
('https://test.com', {'kubelet_client_cert': ('client.crt', 'client.key')},
dict(base_params.items() + verify_true.items() + client_cert.items() + auth_token_header.items())),
dict(base_params.items() + verify_true.items() + client_cert.items())),
]
for url, ssl_context, expected_params in instances:
req.get.reset_mock()
self.kubeutil.ssl_settings = ssl_context
self.kubeutil.retrieve_kubelet_url(url)
self.kubeutil.tls_settings = ssl_context
self.kubeutil.perform_kubelet_query(url)
req.get.assert_called_with(url, **expected_params)

@mock.patch('utils.kubernetes.kubeutil.requests')
Expand Down
85 changes: 45 additions & 40 deletions utils/kubernetes/kubeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import os
from urlparse import urljoin
from urllib import urlencode

# project
from util import check_yaml
Expand All @@ -21,7 +22,7 @@

KUBERNETES_CHECK_NAME = 'kubernetes'

DEFAULT_SSL_VERIFY = True
DEFAULT_TLS_VERIFY = True


class KubeUtil:
Expand Down Expand Up @@ -67,11 +68,15 @@ def __init__(self, instance=None):
self.kubernetes_api_url = 'https://%s/api/v1' % (os.environ.get('KUBERNETES_SERVICE_HOST') or self.DEFAULT_MASTER_NAME)

# kubelet
self.ssl_settings = self._init_ssl_settings(instance)
self.kubelet_api_url = self._locate_kubelet(instance)
if not self.kubelet_api_url:
log.exception("Kubernetes check exiting, cannot run without access to kubelet.")
return
self.tls_settings = self._init_tls_settings(instance)
try:
self.kubelet_api_url = self._locate_kubelet(instance)
if not self.kubelet_api_url:
raise Exception("Couldn't find a method to connect to kubelet.")
except Exception as ex:
log.error("Kubernetes check exiting, cannot run without access to kubelet.")
raise ex

self.kubelet_host = self.kubelet_api_url.split(':')[1].lstrip('/')
self.pods_list_url = urljoin(self.kubelet_api_url, KubeUtil.PODS_LIST_PATH)
self.kube_health_url = urljoin(self.kubelet_api_url, KubeUtil.KUBELET_HEALTH_PATH)
Expand All @@ -86,24 +91,24 @@ def __init__(self, instance=None):
# default value is 0 but TTL for k8s events is one hour anyways
self.last_event_collection_ts = 0

def _init_ssl_settings(self, instance):
def _init_tls_settings(self, instance):
"""
Extract SSL settings from the config.
Extract TLS settings from the config.
"""
ssl_settings = {}
tls_settings = {}

client_crt = instance.get('kubelet_client_crt')
client_key = instance.get('kubelet_client_key')
if client_crt and client_key:
ssl_settings['kubelet_client_cert'] = (client_crt, client_key)
if client_crt and client_key and os.path.exists(client_crt) and os.path.exists(client_key):
tls_settings['kubelet_client_cert'] = (client_crt, client_key)

cert = instance.get('kubelet_cert')
if cert:
ssl_settings['verify'] = cert
tls_settings['verify'] = cert
else:
ssl_settings['verify'] = instance.get('kubelet_ssl_verify', DEFAULT_SSL_VERIFY)
tls_settings['verify'] = instance.get('kubelet_tls_verify', DEFAULT_TLS_VERIFY)

return ssl_settings
return tls_settings

def _locate_kubelet(self, instance):
"""
Expand All @@ -113,10 +118,10 @@ def _locate_kubelet(self, instance):
"""
host = os.environ.get('KUBERNETES_KUBELET_HOST') or instance.get("host")
if not host:
# if no hostname was provided, use the docker hostname if ssl
# if no hostname was provided, use the docker hostname if cert
# validation is not required, the kubernetes hostname otherwise.
docker_hostname = self.docker_util.get_hostname()
if self.ssl_settings['verify']:
if self.tls_settings['verify']:
try:
k8s_hostname = self.get_node_hostname(docker_hostname)
host = k8s_hostname or docker_hostname
Expand All @@ -130,30 +135,31 @@ def _locate_kubelet(self, instance):
port = instance.get('kubelet_port', KubeUtil.DEFAULT_HTTP_KUBELET_PORT)
no_auth_url = 'http://%s:%s' % (host, port)
test_url = urljoin(no_auth_url, KubeUtil.KUBELET_HEALTH_PATH)
self.retrieve_kubelet_url(test_url)
self.perform_kubelet_query(test_url)
return no_auth_url
except Exception:
log.debug("Couldn't query kubelet over HTTP, assuming it's not in no_auth mode.")

try:
port = instance.get('kubelet_port', KubeUtil.DEFAULT_HTTPS_KUBELET_PORT)
port = instance.get('kubelet_port', KubeUtil.DEFAULT_HTTPS_KUBELET_PORT)

https_url = 'https://%s:%s' % (host, port)
test_url = urljoin(https_url, KubeUtil.KUBELET_HEALTH_PATH)
self.retrieve_kubelet_url(test_url)
https_url = 'https://%s:%s' % (host, port)
test_url = urljoin(https_url, KubeUtil.KUBELET_HEALTH_PATH)
self.perform_kubelet_query(test_url)

return https_url
except Exception as ex:
log.error("Kubelet unreachable.\nPlease configure the no-auth endpoint or configure authentication: %s" % str(ex))
return https_url

def get_node_hostname(self, host):
node_filter = 'labelSelector=kubernetes.io/hostname'
"""
Query the API server for the kubernetes hostname of the node
using the docker hostname as a filter.
"""
node_filter = {'labelSelector': 'kubernetes.io/hostname=%s' % host}
node = self.retrieve_json_auth(
self.kubernetes_api_url + '/nodes?%s%%3D%s' % (node_filter, host),
self.kubernetes_api_url + '/nodes?%s' % urlencode(node_filter),
self.get_auth_token()
)
if len(node['items']) != 1:
raise Exception('Error while getting node hostname: expected 1 node, got %s.' % len(node['items']))
log.error('Error while getting node hostname: expected 1 node, got %s.' % len(node['items']))
else:
addresses = (node or {}).get('items', [{}])[0].get('status', {}).get('addresses', [])
for address in addresses:
Expand Down Expand Up @@ -195,7 +201,7 @@ def retrieve_pods_list(self):
TODO: the list of pods could be cached with some policy to be decided.
"""
return self.retrieve_kubelet_url(self.pods_list_url).json()
return self.perform_kubelet_query(self.pods_list_url).json()

def retrieve_machine_info(self):
"""
Expand All @@ -209,19 +215,18 @@ def retrieve_metrics(self):
"""
return retrieve_json(self.metrics_url)

def retrieve_kubelet_url(self, url, verbose=True, timeout=10):
def perform_kubelet_query(self, url, verbose=True, timeout=10):
"""
Perform and return a GET request against kubelet. Support auth and SSL validation.
Perform and return a GET request against kubelet. Support auth and TLS validation.
"""
ssl_context = self.ssl_settings

cert = headers = None
verify = ssl_context.get('verify', DEFAULT_SSL_VERIFY)
tls_context = self.tls_settings

if 'kubelet_client_cert' in ssl_context:
cert = ssl_context['kubelet_client_cert']
headers = None
cert = tls_context.get('kubelet_client_cert')
verify = tls_context.get('verify', DEFAULT_TLS_VERIFY)

if url.lower().startswith('https'):
# if cert-based auth is enabled, don't use the token.
if not cert and url.lower().startswith('https'):
headers = {'Authorization': 'Bearer {}'.format(self.get_auth_token())}

return requests.get(url, timeout=timeout, verify=verify,
Expand All @@ -232,11 +237,11 @@ def retrieve_json_auth(self, url, auth_token, timeout=10, verify=None):
Kubernetes API requires authentication using a token available in
every pod.
We try to verify ssl certificate if available.
We try to verify the certificate if available.
"""
if verify is None:
verify = self.CA_CRT_PATH if os.path.exists(self.CA_CRT_PATH) else False
log.debug('ssl validation: {}'.format(verify))
log.debug('tls validation: {}'.format(verify))
headers = {'Authorization': 'Bearer {}'.format(auth_token)}
r = requests.get(url, timeout=timeout, headers=headers, verify=verify)
r.raise_for_status()
Expand Down

0 comments on commit c5bc507

Please sign in to comment.