diff --git a/checks.d/kubernetes.py b/checks.d/kubernetes.py index 1a2434fa6c..8efedc80f1 100644 --- a/checks.d/kubernetes.py +++ b/checks.d/kubernetes.py @@ -14,7 +14,6 @@ import calendar # 3rd party -import requests import simplejson as json # project @@ -88,8 +87,8 @@ def __init__(self, name, init_config, agentConfig, instances=None): inst = instances[0] if instances is not None else None self.kubeutil = KubeUtil(instance=inst) - if not self.kubeutil.host: - raise Exception('Unable to retrieve Docker hostname and host parameter is not set') + if not self.kubeutil.kubelet_api_url: + raise Exception('Unable to reach kubelet. Try setting the host parameter.') self.k8s_namespace_regexp = None if inst: @@ -104,8 +103,8 @@ def _perform_kubelet_checks(self, url): service_check_base = NAMESPACE + '.kubelet.check' is_ok = True try: - r = requests.get(url, params={'verbose': True}) - for line in r.iter_lines(): + req = self.kubeutil.retrieve_kubelet_url(url) + for line in req.iter_lines(): # avoid noise; this check is expected to fail since we override the container hostname if line.find('hostname') != -1: diff --git a/conf.d/kubernetes.yaml.example b/conf.d/kubernetes.yaml.example index f2b33f7f48..8cfa35e10d 100644 --- a/conf.d/kubernetes.yaml.example +++ b/conf.d/kubernetes.yaml.example @@ -1,27 +1,42 @@ init_config: instances: - # The kubernetes check retrieves metrics from cadvisor running under kubelet. - # By default we will assume we're running under docker and will use the address - # of the default router to reach the cadvisor api unless the environment variable - # KUBERNETES_KUBELET_HOST is found. + # The kubernetes check retrieves metrics from cadvisor running under kubelet on each node. + # It also queries kubelet for its health and the list of local-running pods. # - # To override, e.g. in the case of a standalone cadvisor instance, use the following: + # By default we assume we're running under docker and + # that the kubelet read-only port with no auth is enabled. + # + # In this case we will use the address of the default router to + # reach the kubelet and cadvisor APIs unless the environment variable + # KUBERNETES_KUBELET_HOST is found (if your node names can be resolved by pods, we recommend you set this variable to spec.nodeName through the downward API). + # + # If the read-only endpoint is disabled, the check will query kubelet over HTTPS + # + # To override this behavior, e.g. in the case of a standalone cadvisor instance, use the following: # # host: localhost # port: 4194 # method: http + + # cAdvisor port - port: 4194 + # cAdvisor host + # host: localhost + # Authentication against the apiserver + # # By default the agent authenticates against the apiserver with its service account bearer token. # X509 certificate authentication is also supported. # To enable cert auth in place of bearer token auth, provide here the paths to # a client cert/key pair. # The recommended way to expose these files to the agent is by using Kubernetes Secrets. + # # apiserver_client_crt: /path/to/client.crt # apiserver_client_key: /path/to/client.key # + # # collect_events controls whether the agent should fetch events from the kubernetes API and # ingest them in Datadog. To avoid duplicates, only one agent at a time across the entire # cluster should have this feature enabled. To enable the feature, set the parameter to `true`. @@ -48,8 +63,37 @@ instances: # # use_histogram: false + # kubelet port. It needs to be set if you are not using a default one (10250 or 10255) # kubelet_port: 10255 # + # SSL configuration for querying kubelet. + # + # Available options are: + # - simple SSL validation (with readily available certificates) + # - disabling SSL validation + # - providing the server certificate + # - providing a client cert/key pair + # + # The default is to try and use the read-only port that doesn't require SSL + # And to fall back to the HTTPS API with simple SSL validation. + # + # Explicitly disabling ssl_verify should be used with caution: + # if an attacker sniffs the agent requests they will see the agent's service account bearer token. + # + # Providing a server cert enables SSL validation. + # + # It is possible to providing a client cert/key pair for client auth. + # The agent also uses its pod bearer token (if available) when querying kubelet and the apiserver. + # + # The recommended way to pass certificates and keys to the agent is by using + # "Secret" Kubernetes object (and to mount them as volumes). + # + # kubelet_ssl_verify: True + # kubelet_cert: /path/to/ca.pem + # kubelet_client_crt: /path/to/client.crt + # kubelet_client_key: /path/to/client.key + # + # # We can define a whitelist of patterns that permit publishing raw metrics. # enabled_rates: # - cpu.* diff --git a/tests/checks/fixtures/kubernetes/filtered_node_list_1_4.json b/tests/checks/fixtures/kubernetes/filtered_node_list_1_4.json new file mode 100644 index 0000000000..7eb591761a --- /dev/null +++ b/tests/checks/fixtures/kubernetes/filtered_node_list_1_4.json @@ -0,0 +1 @@ +{"items": [{"status": {"capacity": {"alpha.kubernetes.io/nvidia-gpu": "0", "pods": "110", "cpu": "2", "memory": "4045004Ki"}, "addresses": [{"type": "LegacyHostIP", "address": "10.0.0.179"}, {"type": "InternalIP", "address": "10.0.0.179"}, {"type": "Hostname", "address": "ip-10-0-0-179"}], "nodeInfo": {"kernelVersion": "4.4.0-53-generic", "kubeletVersion": "v1.5.1", "containerRuntimeVersion": "docker://1.12.1", "machineID": "7b8ef452673e468f83443e29b4773c15", "kubeProxyVersion": "v1.5.1", "bootID": "929724dc-5603-429a-9e73-6c46e9f6f43b", "osImage": "Ubuntu 16.04.1 LTS", "architecture": "amd64", "systemUUID": "EC2501F2-8EE6-DD8B-96DC-0467866A5D86", "operatingSystem": "linux"}, "allocatable": {"alpha.kubernetes.io/nvidia-gpu": "0", "pods": "110", "cpu": "2", "memory": "4045004Ki"}, "daemonEndpoints": {"kubeletEndpoint": {"Port": 10250}}, "images": [{"sizeBytes": 463140901, "names": ["datadog/dev-dd-agent@sha256:829ccd2e8321507ee51929daa2a6347e0a6389483d25463dc241bfccf9199660", "datadog/dev-dd-agent:kubelet-auth"]}, {"sizeBytes": 398248138, "names": ["datadog/docker-dd-agent@sha256:577b876eae85e6d4bf372cfafb10211eb3abfafe16a44b1b94eec957cb9efb08", "datadog/docker-dd-agent:optimize-kv"]}, {"sizeBytes": 334276060, "names": ["datadog/docker-dd-agent@sha256:286632728630a18315913285b002a970429350e220f5fa36e70ff2ce27bbcb9b", "datadog/docker-dd-agent:11.0.5101"]}, {"sizeBytes": 182896153, "names": ["redis@sha256:eed4da4937cb562e9005f3c66eb8c3abc14bb95ad497c03dc89d66bcd172fc7f", "redis:latest"]}, {"sizeBytes": 175577274, "names": ["gcr.io/google_containers/kube-proxy-amd64@sha256:3b82b2e0862b3c0ece915de29a5a53634c9b0a73140340f232533c645decbd4b", "gcr.io/google_containers/kube-proxy-amd64:v1.5.1"]}, {"sizeBytes": 166671224, "names": ["weaveworks/weave-kube@sha256:ebd53291e97326b87ddcfd9a45a779ebaf2ca42fd54489957bac3c808999a057", "weaveworks/weave-kube:1.8.2"]}, {"sizeBytes": 68772084, "names": ["weaveworks/weave-npc@sha256:39fd8697bcc6ccbcfdbb209e4028fb36002db0d97eec9cadd88120a0b18181a4", "weaveworks/weave-npc:1.8.2"]}, {"sizeBytes": 746888, "names": ["gcr.io/google_containers/pause-amd64@sha256:163ac025575b775d1c0f9bf0bdd0f086883171eb475b5068e7defa4ca9e76516", "gcr.io/google_containers/pause-amd64:3.0"]}], "conditions": [{"status": "False", "lastTransitionTime": "2017-01-11T16:25:13Z", "reason": "KubeletHasSufficientDisk", "lastHeartbeatTime": "2017-01-22T22:17:32Z", "message": "kubelet has sufficient disk space available", "type": "OutOfDisk"}, {"status": "False", "lastTransitionTime": "2017-01-11T16:25:13Z", "reason": "KubeletHasSufficientMemory", "lastHeartbeatTime": "2017-01-22T22:17:32Z", "message": "kubelet has sufficient memory available", "type": "MemoryPressure"}, {"status": "False", "lastTransitionTime": "2017-01-11T16:25:13Z", "reason": "KubeletHasNoDiskPressure", "lastHeartbeatTime": "2017-01-22T22:17:32Z", "message": "kubelet has no disk pressure", "type": "DiskPressure"}, {"status": "True", "lastTransitionTime": "2017-01-11T21:32:03Z", "reason": "KubeletReady", "lastHeartbeatTime": "2017-01-22T22:17:32Z", "message": "kubelet is posting ready status. AppArmor enabled", "type": "Ready"}]}, "spec": {"providerID": "aws:////i-0860ce2a36f75f436", "externalID": "ip-10-0-0-179"}, "metadata": {"name": "ip-10-0-0-179", "labels": {"kubernetes.io/hostname": "ip-10-0-0-179", "beta.kubernetes.io/os": "linux", "beta.kubernetes.io/arch": "amd64"}, "resourceVersion": "1360060", "creationTimestamp": "2017-01-11T16:25:13Z", "annotations": {"volumes.kubernetes.io/controller-managed-attach-detach": "true"}, "selfLink": "/api/v1/nodesip-10-0-0-179", "uid": "87a4a4af-d81a-11e6-aee2-121441fd904e"}}], "kind": "NodeList", "apiVersion": "v1", "metadata": {"selfLink": "/api/v1/nodes", "resourceVersion": "1360069"}} diff --git a/tests/checks/mock/test_kubernetes.py b/tests/checks/mock/test_kubernetes.py index 7589958b20..2151a36e29 100644 --- a/tests/checks/mock/test_kubernetes.py +++ b/tests/checks/mock/test_kubernetes.py @@ -12,7 +12,7 @@ # project from tests.checks.common import AgentCheckTest, Fixtures from checks import AgentCheck -from utils.kubernetes import KubeUtil +from utils.kubernetes.kubeutil import KubeUtil from utils.platform import Platform CPU = "CPU" @@ -65,6 +65,7 @@ class TestKubernetes(AgentCheckTest): side_effect=lambda: json.loads(Fixtures.read_file("metrics_1.1.json"))) @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=lambda: json.loads(Fixtures.read_file("pods_list_1.1.json", string_escape=False))) + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_fail_1_1(self, *args): # To avoid the disparition of some gauges during the second check config = { @@ -81,6 +82,7 @@ def test_fail_1_1(self, *args): side_effect=lambda: json.loads(Fixtures.read_file("metrics_1.1.json"))) @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=lambda: json.loads(Fixtures.read_file("pods_list_1.1.json", string_escape=False))) + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_metrics_1_1(self, *args): # To avoid the disparition of some gauges during the second check mocks = { @@ -139,6 +141,7 @@ def test_metrics_1_1(self, *args): side_effect=lambda: json.loads(Fixtures.read_file("metrics_1.1.json"))) @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=lambda: json.loads(Fixtures.read_file("pods_list_1.1.json", string_escape=False))) + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_historate_1_1(self, *args): # To avoid the disparition of some gauges during the second check mocks = { @@ -190,6 +193,7 @@ def test_historate_1_1(self, *args): side_effect=lambda: json.loads(Fixtures.read_file("metrics_1.2.json"))) @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=lambda: json.loads(Fixtures.read_file("pods_list_1.2.json", string_escape=False))) + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_fail_1_2(self, *args): # To avoid the disparition of some gauges during the second check config = { @@ -207,6 +211,7 @@ def test_fail_1_2(self, *args): side_effect=lambda: json.loads(Fixtures.read_file("metrics_1.2.json"))) @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=lambda: json.loads(Fixtures.read_file("pods_list_1.2.json", string_escape=False))) + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_metrics_1_2(self, *args): mocks = { '_perform_kubelet_checks': lambda x: None, @@ -258,6 +263,7 @@ def test_metrics_1_2(self, *args): side_effect=lambda: json.loads(Fixtures.read_file("metrics_1.2.json"))) @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=lambda: json.loads(Fixtures.read_file("pods_list_1.2.json", string_escape=False))) + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_historate_1_2(self, *args): # To avoid the disparition of some gauges during the second check mocks = { @@ -302,14 +308,13 @@ def test_historate_1_2(self, *args): @mock.patch('utils.kubernetes.KubeUtil.get_node_info', side_effect=lambda: ('Foo', 'Bar')) - @mock.patch('utils.kubernetes.KubeUtil.filter_pods_list', - side_effect=lambda x, y: x) @mock.patch('utils.kubernetes.KubeUtil.retrieve_json_auth', side_effect=KubeUtil_fake_retrieve_json_auth) @mock.patch('utils.kubernetes.KubeUtil.retrieve_machine_info') @mock.patch('utils.kubernetes.KubeUtil.retrieve_metrics') @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=lambda: json.loads(Fixtures.read_file("pods_list_1.2.json", string_escape=False))) + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_events(self, *args): # default value for collect_events is False config = {'instances': [{'host': 'foo'}]} @@ -330,12 +335,12 @@ def test_events(self, *args): @mock.patch('utils.kubernetes.KubeUtil.get_node_info', side_effect=lambda: ('Foo', 'Bar')) - @mock.patch('utils.kubernetes.KubeUtil.filter_pods_list') @mock.patch('utils.kubernetes.KubeUtil.retrieve_json_auth', side_effect=KubeUtil_fake_retrieve_json_auth) @mock.patch('utils.kubernetes.KubeUtil.retrieve_machine_info') @mock.patch('utils.kubernetes.KubeUtil.retrieve_metrics') @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list') + @mock.patch('utils.kubernetes.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') def test_namespaced_events(self, *args): # reset last event pulling time KubeUtil().last_event_collection_ts = 0 @@ -375,9 +380,134 @@ def test_namespaced_events(self, *args): self.assertEvent('hello-node-47289321-91tfd Scheduled on Bar', count=0, exact_match=False) class TestKubeutil(unittest.TestCase): - def setUp(self): + + @mock.patch('utils.kubernetes.kubeutil.KubeUtil._locate_kubelet', return_value='http://172.17.0.1:10255') + def setUp(self, _locate_kubelet): self.kubeutil = KubeUtil() + def test_init_ssl_settings(self): + instances = [ + # (instance, expected_result) + ({}, {'verify': True}), + ({'kubelet_ssl_verify': False}, {'verify': False}), + ({'kubelet_ssl_verify': True}, {'verify': True}), + ({'kubelet_ssl_verify': 'foo.pem'}, {'verify': 'foo.pem'}), + ({'kubelet_cert': 'foo.pem'}, {'verify': 'foo.pem'}), + ({'kubelet_client_crt': 'client.crt', 'kubelet_client_key': 'client.key'}, + {'verify': True, 'kubelet_client_cert': ('client.crt', 'client.key')}), + ({'kubelet_ssl_verify': True, 'kubelet_client_crt': 'client.crt'}, {'verify': True}), + ({'kubelet_client_crt': 'client.crt'}, {'verify': True}) + ] + for instance, result in instances: + self.assertEqual(self.kubeutil._init_ssl_settings(instance), result) + + + ##### Test _locate_kubelet ##### + + # we support connection to kubelet in 3 modes + # - no auth/no ssl --> over the --no-auth port + # - no auth/yes ssl (no verify) --> over the port used by apiserver if anonymous requests are accepted + # - yes auth/yes ssl (yes verify) --> same, but the user provided a way to verify kubelet's + # cert and we attach a bearer token if available + + @mock.patch('utils.kubernetes.kubeutil.DockerUtil.get_hostname', return_value='test_docker_host') + def test_locate_kubelet_no_auth_no_ssl(self, _get_hostname): + no_auth_no_ssl_instances = [ + # instance, expected_result + ({}, 'http://test_docker_host:10255'), + ({'host': 'test_explicit_host'}, 'http://test_explicit_host:10255'), + ({'kubelet_port': '1337'}, 'http://test_docker_host:1337'), + ({'host': 'test_explicit_host', 'kubelet_port': '1337'}, 'http://test_explicit_host:1337') + ] + with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_kubelet_url', return_value=True): + for instance, result in no_auth_no_ssl_instances: + self.assertEqual(self.kubeutil._locate_kubelet(instance), result) + + @mock.patch('utils.kubernetes.kubeutil.DockerUtil.get_hostname', return_value='test_docker_host') + def test_locate_kubelet_no_auth_no_verify(self, _get_hostname): + no_auth_no_verify_instances = [ + # instance, expected_result + ({}, 'https://test_docker_host:10250'), + ({'kubelet_port': '1337'}, 'https://test_docker_host:1337'), + ({'host': 'test_explicit_host'}, 'https://test_explicit_host:10250'), + ({'host': 'test_explicit_host', 'kubelet_port': '1337'}, 'https://test_explicit_host:1337'), + ] + + def side_effect(url): + """Mock KubeUtil.retrieve_kubelet_url""" + if url.startswith('https://'): + return True + else: + raise Exception() + + with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_kubelet_url', side_effect=side_effect): + for instance, result in no_auth_no_verify_instances: + self.assertEqual(self.kubeutil._locate_kubelet(instance), result) + + @mock.patch('utils.kubernetes.kubeutil.DockerUtil.get_hostname', return_value='test_docker_host') + @mock.patch('utils.kubernetes.kubeutil.KubeUtil.get_node_hostname', return_value='test_k8s_host') + @mock.patch('utils.kubernetes.kubeutil.KubeUtil.get_auth_token', return_value='foo') + def test_locate_kubelet_verify_and_auth(self, *args): + """ + Test kubelet connection with SSL. Also look for auth token. + """ + no_auth_instances = [ + # instance, ssl_settings, expected_result + ({}, {'verify': True}, 'https://test_k8s_host:10250'), + ({'kubelet_port': '1337'}, {'verify': 'test.pem'}, 'https://test_k8s_host:1337'), + ( + {'host': 'test_explicit_host'}, + {'verify': True, 'kubelet_client_cert': ('client.crt', 'client.key')}, + 'https://test_explicit_host:10250' + ), + ( + {'host': 'test_explicit_host', 'kubelet_port': '1337'}, + {'verify': True}, + 'https://test_explicit_host:1337' + ), + ] + + def side_effect(url, **kwargs): + """Mock KubeUtil.retrieve_kubelet_url""" + if url.startswith('https://') and '10255' not in url: + return True + else: + raise Exception() + + # no auth / SSL with verify + for instance, ssl_settings, result in no_auth_instances: + with mock.patch('utils.kubernetes.kubeutil.requests') as req: + req.get = mock.MagicMock(side_effect=side_effect) + self.kubeutil.ssl_settings = ssl_settings + self.assertEqual(self.kubeutil._locate_kubelet(instance), result) + req.get.assert_called_with(result + '/healthz', # test endpoint + timeout=10, + verify=ssl_settings.get('verify', False), + cert=ssl_settings.get('kubelet_client_cert'), + headers={'Authorization': 'Bearer foo'}, # auth + params={'verbose': True} + ) + + @mock.patch('utils.kubernetes.kubeutil.KubeUtil.get_auth_token', return_value='foo') + def test_get_node_hostname(self, _get_auth_tkn): + node_lists = [ + (json.loads(Fixtures.read_file('filtered_node_list_1_4.json', string_escape=False)), 'ip-10-0-0-179'), + ({'items': [{'foo': 'bar'}]}, None) + ] + + exception_node_lists = [ + {'items': []}, + {'items': [{'foo': 'bar'}, {'bar': 'foo'}]} + ] + + for node_list, expected_result in node_lists: + with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_json_auth', return_value=node_list): + self.assertEqual(self.kubeutil.get_node_hostname('ip-10-0-0-179'), expected_result) + + for node_list in exception_node_lists: + with mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_json_auth', return_value=node_list): + self.assertRaises(Exception, self.kubeutil.get_node_hostname, 'ip-10-0-0-179') + @mock.patch('utils.kubernetes.KubeUtil.retrieve_pods_list', side_effect=['foo']) @mock.patch('utils.kubernetes.KubeUtil.extract_kube_labels') def test_get_kube_labels(self, extract_kube_labels, retrieve_pods_list): @@ -421,29 +551,10 @@ def test_extract_kube_labels(self): labels = set(inn for out in res.values() for inn in out) self.assertEqual(len(labels), 3) - def test_extract_meta(self): - """ - Test with both 1.1 and 1.2 version payloads - """ - res = self.kubeutil.extract_meta({}, 'foo') - self.assertEqual(len(res), 0) - - pods = json.loads(Fixtures.read_file("pods_list_1.1.json", string_escape=False)) - res = self.kubeutil.extract_meta(pods, 'foo') - self.assertEqual(len(res), 0) - res = self.kubeutil.extract_meta(pods, 'uid') - self.assertEqual(len(res), 6) - - pods = json.loads(Fixtures.read_file("pods_list_1.2.json", string_escape=False)) - res = self.kubeutil.extract_meta(pods, 'foo') - self.assertEqual(len(res), 0) - res = self.kubeutil.extract_meta(pods, 'uid') - self.assertEqual(len(res), 4) - - @mock.patch('utils.kubernetes.kubeutil.retrieve_json') - def test_retrieve_pods_list(self, retrieve_json): + @mock.patch('utils.kubernetes.kubeutil.KubeUtil.retrieve_kubelet_url') + def test_retrieve_pods_list(self, retrieve_url): self.kubeutil.retrieve_pods_list() - retrieve_json.assert_called_once_with(self.kubeutil.pods_list_url) + retrieve_url.assert_called_twice_with(self.kubeutil.pods_list_url, verbose=True, timeout=10) @mock.patch('utils.kubernetes.kubeutil.retrieve_json') def test_retrieve_machine_info(self, retrieve_json): @@ -455,28 +566,30 @@ def test_retrieve_metrics(self, retrieve_json): self.kubeutil.retrieve_metrics() retrieve_json.assert_called_once_with(self.kubeutil.metrics_url) - def test_filter_pods_list(self): - """ - Test with both 1.1 and 1.2 version payloads - """ - res = self.kubeutil.filter_pods_list({}, 'foo') - self.assertEqual(len(res.get('items')), 0) - - pods = json.loads(Fixtures.read_file("pods_list_1.1.json", string_escape=False)) - res = self.kubeutil.filter_pods_list(pods, '10.240.0.9') - self.assertEqual(len(res.get('items')), 5) - - pods = json.loads(Fixtures.read_file("pods_list_1.1.json", string_escape=False)) - res = self.kubeutil.filter_pods_list(pods, 'foo') - self.assertEqual(len(res.get('items')), 0) + @mock.patch('utils.kubernetes.kubeutil.KubeUtil.get_auth_token', return_value='foo') + @mock.patch('utils.kubernetes.kubeutil.requests') + def test_retrieve_kubelet_url(self, req, _get_auth_tkn): + base_params = {'timeout': 10, 'verify': False, + 'params': {'verbose': True}, 'cert': None, 'headers': None} - pods = json.loads(Fixtures.read_file("pods_list_1.2.json", string_escape=False)) - res = self.kubeutil.filter_pods_list(pods, '10.240.0.5') - self.assertEqual(len(res.get('items')), 1) + auth_token_header = {'headers': {'Authorization': 'Bearer %s' % self.kubeutil.get_auth_token()}} + verify_true = {'verify': True} + verify_cert = {'verify': 'kubelet.pem'} + client_cert = {'cert': ('client.crt', 'client.key')} - pods = json.loads(Fixtures.read_file("pods_list_1.2.json", string_escape=False)) - res = self.kubeutil.filter_pods_list(pods, 'foo') - self.assertEqual(len(res.get('items')), 0) + instances = [ + ('http://test.com', {}, dict(base_params.items() + verify_true.items())), + ('https://test.com', {}, dict(base_params.items() + verify_true.items() + auth_token_header.items())), + ('https://test.com', {'verify': True}, dict(base_params.items() + verify_true.items() + auth_token_header.items())), + ('https://test.com', {'verify': 'kubelet.pem'}, dict(base_params.items() + verify_cert.items() + auth_token_header.items())), + ('https://test.com', {'kubelet_client_cert': ('client.crt', 'client.key')}, + dict(base_params.items() + verify_true.items() + client_cert.items() + auth_token_header.items())), + ] + for url, ssl_context, expected_params in instances: + req.get.reset_mock() + self.kubeutil.ssl_settings = ssl_context + self.kubeutil.retrieve_kubelet_url(url) + req.get.assert_called_with(url, **expected_params) @mock.patch('utils.kubernetes.kubeutil.requests') def test_retrieve_json_auth(self, r): diff --git a/utils/http.py b/utils/http.py index e0795a5f89..eb46087466 100644 --- a/utils/http.py +++ b/utils/http.py @@ -8,7 +8,7 @@ DEFAULT_TIMEOUT = 10 -def retrieve_json(url, timeout=DEFAULT_TIMEOUT): - r = requests.get(url, timeout=timeout) +def retrieve_json(url, timeout=DEFAULT_TIMEOUT, verify=True): + r = requests.get(url, timeout=timeout, verify=verify) r.raise_for_status() return r.json() diff --git a/utils/kubernetes/kubeutil.py b/utils/kubernetes/kubeutil.py index 87f184cfcd..b50188376d 100644 --- a/utils/kubernetes/kubeutil.py +++ b/utils/kubernetes/kubeutil.py @@ -21,16 +21,20 @@ KUBERNETES_CHECK_NAME = 'kubernetes' +DEFAULT_SSL_VERIFY = True + class KubeUtil: __metaclass__ = Singleton DEFAULT_METHOD = 'http' + KUBELET_HEALTH_PATH = '/healthz' MACHINE_INFO_PATH = '/api/v1.3/machine/' METRICS_PATH = '/api/v1.3/subcontainers/' PODS_LIST_PATH = '/pods/' DEFAULT_CADVISOR_PORT = 4194 - DEFAULT_KUBELET_PORT = 10255 + DEFAULT_HTTP_KUBELET_PORT = 10255 + DEFAULT_HTTPS_KUBELET_PORT = 10250 DEFAULT_MASTER_PORT = 8080 DEFAULT_MASTER_NAME = 'kubernetes' # DNS name to reach the master from a pod. CA_CRT_PATH = '/run/secrets/kubernetes.io/serviceaccount/ca.crt' @@ -56,23 +60,27 @@ def __init__(self, instance=None): instance = {} self.method = instance.get('method', KubeUtil.DEFAULT_METHOD) - self.host = instance.get("host") or self.docker_util.get_hostname() - self.kubelet_host = os.environ.get('KUBERNETES_KUBELET_HOST') or self.host self._node_ip = self._node_name = None # lazy evaluation self.host_name = os.environ.get('HOSTNAME') + self.tls_settings = self._init_tls_settings(instance) - self.cadvisor_port = instance.get('port', KubeUtil.DEFAULT_CADVISOR_PORT) - self.kubelet_port = instance.get('kubelet_port', KubeUtil.DEFAULT_KUBELET_PORT) - - self.kubelet_api_url = '%s://%s:%d' % (self.method, self.kubelet_host, self.kubelet_port) - self.cadvisor_url = '%s://%s:%d' % (self.method, self.kubelet_host, self.cadvisor_port) + # apiserver self.kubernetes_api_url = 'https://%s/api/v1' % (os.environ.get('KUBERNETES_SERVICE_HOST') or self.DEFAULT_MASTER_NAME) - self.tls_settings = self._init_tls_settings(instance) + # kubelet + self.kubelet_api_url = self._locate_kubelet(instance) + if not self.kubelet_api_url: + log.exception("Kubernetes check exiting, cannot run without access to kubelet.") + return + self.kubelet_host = self.kubelet_api_url.split(':')[1].lstrip('/') + self.pods_list_url = urljoin(self.kubelet_api_url, KubeUtil.PODS_LIST_PATH) + self.kube_health_url = urljoin(self.kubelet_api_url, KubeUtil.KUBELET_HEALTH_PATH) + + # cadvisor + self.cadvisor_port = instance.get('port', KubeUtil.DEFAULT_CADVISOR_PORT) + self.cadvisor_url = '%s://%s:%d' % (self.method, self.kubelet_host, self.cadvisor_port) self.metrics_url = urljoin(self.cadvisor_url, KubeUtil.METRICS_PATH) self.machine_info_url = urljoin(self.cadvisor_url, KubeUtil.MACHINE_INFO_PATH) - self.pods_list_url = urljoin(self.kubelet_api_url, KubeUtil.PODS_LIST_PATH) - self.kube_health_url = urljoin(self.kubelet_api_url, 'healthz') # keep track of the latest k8s event we collected and posted # default value is 0 but TTL for k8s events is one hour anyways @@ -84,6 +92,7 @@ def _init_tls_settings(self, instance): """ tls_settings = {} + # apiserver client_crt = instance.get('apiserver_client_crt') client_key = instance.get('apiserver_client_key') if client_crt and client_key and os.path.exists(client_crt) and os.path.exists(client_key): @@ -93,8 +102,77 @@ def _init_tls_settings(self, instance): if token: tls_settings['bearer_token'] + # kubelet + kubelet_client_crt = instance.get('kubelet_client_crt') + kubelet_client_key = instance.get('kubelet_client_key') + if kubelet_client_crt and kubelet_client_key: + tls_settings['kubelet_client_cert'] = (kubelet_client_crt, kubelet_client_key) + + cert = instance.get('kubelet_cert') + if cert: + tls_settings['kubelet_verify'] = cert + else: + tls_settings['kubelet_verify'] = instance.get('kubelet_ssl_verify', DEFAULT_SSL_VERIFY) + + return tls_settings + def _locate_kubelet(self, instance): + """ + Kubelet may or may not accept un-authenticated http requests. + If it doesn't we need to use its HTTPS API that may or may not + require auth. + """ + host = os.environ.get('KUBERNETES_KUBELET_HOST') or instance.get("host") + if not host: + # if no hostname was provided, use the docker hostname if ssl + # validation is not required, the kubernetes hostname otherwise. + docker_hostname = self.docker_util.get_hostname() + if self.tls_settings['kubelet_verify']: + try: + k8s_hostname = self.get_node_hostname(docker_hostname) + host = k8s_hostname or docker_hostname + except Exception as ex: + log.error(str(ex)) + host = docker_hostname + else: + host = docker_hostname + try: + # check if the no-auth endpoint is enabled + port = instance.get('kubelet_port', KubeUtil.DEFAULT_HTTP_KUBELET_PORT) + no_auth_url = 'http://%s:%s' % (host, port) + test_url = urljoin(no_auth_url, KubeUtil.KUBELET_HEALTH_PATH) + self.retrieve_kubelet_url(test_url) + return no_auth_url + except Exception: + log.debug("Couldn't query kubelet over HTTP, assuming it's not in no_auth mode.") + + try: + port = instance.get('kubelet_port', KubeUtil.DEFAULT_HTTPS_KUBELET_PORT) + + https_url = 'https://%s:%s' % (host, port) + test_url = urljoin(https_url, KubeUtil.KUBELET_HEALTH_PATH) + self.retrieve_kubelet_url(test_url) + + return https_url + except Exception as ex: + log.error("Kubelet unreachable.\nPlease configure the no-auth endpoint or configure authentication: %s" % str(ex)) + + def get_node_hostname(self, host): + node_filter = 'labelSelector=kubernetes.io/hostname' + node = self.retrieve_json_auth( + self.kubernetes_api_url + '/nodes?%s%%3D%s' % (node_filter, host), + self.get_auth_token() + ) + if len(node['items']) != 1: + raise Exception('Error while getting node hostname: expected 1 node, got %s.' % len(node['items'])) + else: + addresses = (node or {}).get('items', [{}])[0].get('status', {}).get('addresses', []) + for address in addresses: + if address.get('type') == 'Hostname': + return address['address'] + return None + def get_kube_labels(self, excluded_keys=None): pods = self.retrieve_pods_list() return self.extract_kube_labels(pods, excluded_keys=excluded_keys) @@ -123,28 +201,13 @@ def extract_kube_labels(self, pods_list, excluded_keys=None): return kube_labels - def extract_meta(self, pods_list, field_name): - """ - Exctract fields like `uid` or `name` from the `metadata` section of a - list of pods coming from the kubelet API. - - TODO: currently not in use, was added to support events filtering, consider to remove it. - """ - uids = [] - pods = pods_list.get("items") or [] - for p in pods: - value = p.get('metadata', {}).get(field_name) - if value is not None: - uids.append(value) - return uids - def retrieve_pods_list(self): """ Retrieve the list of pods for this cluster querying the kubelet API. TODO: the list of pods could be cached with some policy to be decided. """ - return retrieve_json(self.pods_list_url) + return self.retrieve_kubelet_url(self.pods_list_url).json() def retrieve_machine_info(self): """ @@ -158,26 +221,25 @@ def retrieve_metrics(self): """ return retrieve_json(self.metrics_url) - def filter_pods_list(self, pods_list, host_ip): + def retrieve_kubelet_url(self, url, verbose=True, timeout=10): """ - Filter out (in place) pods that are not running on the given host. - - TODO: currently not in use, was added to support events filtering, consider to remove it. + Perform and return a GET request against kubelet. Support auth and SSL validation. """ - pod_items = pods_list.get('items') or [] - log.debug('Found {} pods to filter'.format(len(pod_items))) + ssl_context = self.tls_settings - filtered_pods = [] - for pod in pod_items: - status = pod.get('status', {}) - if status.get('hostIP') == host_ip: - filtered_pods.append(pod) - log.debug('Pods after filtering: {}'.format(len(filtered_pods))) + cert = headers = None + verify = ssl_context.get('kubelet_verify', DEFAULT_SSL_VERIFY) + + if 'kubelet_client_cert' in ssl_context: + cert = ssl_context['kubelet_client_cert'] + + if url.lower().startswith('https'): + headers = {'Authorization': 'Bearer {}'.format(self.get_auth_token())} - pods_list['items'] = filtered_pods - return pods_list + return requests.get(url, timeout=timeout, verify=verify, + cert=cert, headers=headers, params={'verbose': verbose}) - def retrieve_json_auth(self, url, timeout=10): + def retrieve_json_auth(self, url, timeout=10, verify=None): """ Kubernetes API requires authentication using a token available in every pod, or with a client X509 cert/key pair. @@ -187,7 +249,8 @@ def retrieve_json_auth(self, url, timeout=10): We try to verify the server TLS cert if the public cert is available. """ - verify = self.CA_CRT_PATH if os.path.exists(self.CA_CRT_PATH) else False + if verify is None: + verify = self.CA_CRT_PATH if os.path.exists(self.CA_CRT_PATH) else False log.debug('ssl validation: {}'.format(verify)) cert = self.tls_settings.get('apiserver_client_cert') @@ -209,7 +272,7 @@ def get_node_info(self): def _fetch_host_data(self): """ Retrieve host name and IP address from the payload returned by the listing - pods endpoints from kubelet or kubernetes API. + pods endpoints from kubelet. The host IP address is different from the default router for the pod. """ diff --git a/utils/service_discovery/sd_docker_backend.py b/utils/service_discovery/sd_docker_backend.py index 5e0a4515f2..892b47c3b9 100644 --- a/utils/service_discovery/sd_docker_backend.py +++ b/utils/service_discovery/sd_docker_backend.py @@ -92,9 +92,13 @@ def __init__(self, agentConfig): AbstractSDBackend.__init__(self, agentConfig) def _make_fetch_state(self): - return _SDDockerBackendConfigFetchState( - self.docker_client.inspect_container, - self.kubeutil.retrieve_pods_list().get('items', []) if Platform.is_k8s() else None) + pod_list = [] + if Platform.is_k8s(): + try: + pod_list = self.kubeutil.retrieve_pods_list().get('items', []) + except Exception as ex: + log.warning("Failed to retrieve pod list: %s" % str(ex)) + return _SDDockerBackendConfigFetchState(self.docker_client.inspect_container, pod_list) def update_checks(self, changed_containers): state = self._make_fetch_state()