Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kubernetes] Introduce leader election among agents for event collection #3476

Merged
merged 9 commits into from
Aug 22, 2017
13 changes: 7 additions & 6 deletions tests/core/test_kube_event_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# project
from utils.kubernetes import KubeEventRetriever
from tests.core.test_kubeutil import KubeTestCase
from .test_orchestrator import MockResponse


class TestKubeEventRetriever(KubeTestCase):
Expand All @@ -26,7 +27,7 @@ def _build_events(cls, specs):
return {'items': items}

def test_events_resversion_filtering(self):
jsons = self._load_json_array(
jsons = self._load_resp_array(
['service_cache_events1.json', 'service_cache_events2.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
retr = KubeEventRetriever(self.kube)
Expand All @@ -43,7 +44,7 @@ def test_events_resversion_filtering(self):

@patch('time.time')
def test_events_delay(self, mock_time):
jsons = self._load_json_array(
jsons = self._load_resp_array(
['service_cache_events1.json', 'service_cache_events2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
retr = KubeEventRetriever(self.kube, delay=500)
Expand All @@ -66,29 +67,29 @@ def test_events_delay(self, mock_time):
self.assertEquals(2709, retr.last_resversion)

def test_namespace_serverside_filtering(self):
with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method:
with patch.object(self.kube, 'retrieve_json_auth', return_value=MockResponse({}, 200)) as mock_method:
retr = KubeEventRetriever(self.kube, namespaces=['testns'])
retr.get_event_array()
mock_method.assert_called_once_with('https://kubernetes:443/api/v1/namespaces/testns/events', params={})

def test_namespace_clientside_filtering(self):
val = self._build_events([('ns1', 'k1'), ('ns2', 'k1'), ('testns', 'k1')])
with patch.object(self.kube, 'retrieve_json_auth', return_value=val) as mock_method:
with patch.object(self.kube, 'retrieve_json_auth', return_value=MockResponse(val, 200)) as mock_method:
retr = KubeEventRetriever(self.kube, namespaces=['testns', 'ns2'])
events = retr.get_event_array()
self.assertEquals(2, len(events))
mock_method.assert_called_once_with('https://kubernetes:443/api/v1/events', params={})

def test_kind_serverside_filtering(self):
with patch.object(self.kube, 'retrieve_json_auth', return_value={}) as mock_method:
with patch.object(self.kube, 'retrieve_json_auth', return_value=MockResponse({}, 200)) as mock_method:
retr = KubeEventRetriever(self.kube, kinds=['k1'])
retr.get_event_array()
mock_method.assert_called_once_with('https://kubernetes:443/api/v1/events',
params={'fieldSelector': 'involvedObject.kind=k1'})

def test_kind_clientside_filtering(self):
val = self._build_events([('ns1', 'k1'), ('ns1', 'k1'), ('ns1', 'k2'), ('ns1', 'k3')])
with patch.object(self.kube, 'retrieve_json_auth', return_value=val) as mock_method:
with patch.object(self.kube, 'retrieve_json_auth', return_value=MockResponse(val, 200)) as mock_method:
retr = KubeEventRetriever(self.kube, kinds=['k1', 'k2'])
events = retr.get_event_array()
self.assertEquals(3, len(events))
Expand Down
15 changes: 8 additions & 7 deletions tests/core/test_kube_pod_service_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_init(self):
self.assertEqual(0, len(mapper._pod_services_mapping))

def test_service_cache_fill(self):
jsons = self._load_json_array(['service_cache_services2.json'])
jsons = self._load_resp_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
Expand All @@ -49,15 +49,15 @@ def test_service_cache_fill(self):
self.assertEqual('db', redis['tier'])

def test_pod_to_service_no_match(self):
jsons = self._load_json_array(['service_cache_services2.json'])
jsons = self._load_resp_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
mapper._fill_services_cache()
no_match = self._build_pod_metadata(0, {'app': 'unknown'})
self.assertEqual(0, len(mapper.match_services_for_pod(no_match)))

def test_pod_to_service_two_matches(self):
jsons = self._load_json_array(['service_cache_services2.json'])
jsons = self._load_resp_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -68,7 +68,7 @@ def test_pod_to_service_two_matches(self):
sorted(mapper.match_services_for_pod(two_matches, names=True)))

def test_pod_to_service_cache(self):
jsons = self._load_json_array(['service_cache_services2.json'])
jsons = self._load_resp_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
two_matches = self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'})
Expand All @@ -79,8 +79,9 @@ def test_pod_to_service_cache(self):
sorted(mapper.match_services_for_pod({'uid': 0}, names=True)))

def test_pods_for_service(self):
jsons = self._load_json_array(['service_cache_services2.json'])
jsons = self._load_resp_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):

# Fill pod label cache
mapper = PodServiceMapper(self.kube)
mapper.match_services_for_pod(self._build_pod_metadata(0, {'app': 'hello', 'tier': 'db'}))
Expand Down Expand Up @@ -117,7 +118,7 @@ def test_403_disable(self):
request_mock.assert_not_called()

def _prepare_events_tests(self, jsonfiles):
jsons = self._load_json_array(jsonfiles)
jsons = self._load_resp_array(jsonfiles)
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
mapper = PodServiceMapper(self.kube)
# Fill pod label cache
Expand Down Expand Up @@ -161,7 +162,7 @@ def test_event_service_created_invalidation(self):

event = {'involvedObject': {'kind': 'Service', 'uid': ALL_HELLO_UID},
'reason': 'CreatedLoadBalancer'}
jsons = self._load_json_array(['service_cache_services2.json'])
jsons = self._load_resp_array(['service_cache_services2.json'])
with patch.object(self.kube, 'retrieve_json_auth', side_effect=jsons):
# Three pods must be reloaded
self.assertEqual(set([0, 1, 3]), mapper.process_events([event]))
Expand Down
5 changes: 5 additions & 0 deletions tests/core/test_kubeutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# project
from utils.kubernetes import KubeUtil
from .test_orchestrator import MockResponse

class KubeTestCase(unittest.TestCase):
# Patch _locate_kubelet that is used by KubeUtil.__init__
Expand All @@ -28,6 +29,10 @@ def _load_json_array(cls, names):
json_array.append(json.load(data_file))
return json_array

@classmethod
def _load_resp_array(cls, names):
json_array = cls._load_json_array(names)
return map(lambda x: MockResponse(x, 200), json_array)

class TestKubeUtilDeploymentTag(KubeTestCase):
def test_deployment_name_nominal(self):
Expand Down
161 changes: 161 additions & 0 deletions tests/core/test_leader_elector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# stdlib
import datetime
import time

# 3rd party
import mock

# project
from tests.core.test_kubeutil import KubeTestCase
from utils.kubernetes.leader_elector import LeaderElector, \
ACQUIRE_TIME_ANNOTATION, CREATOR_ANNOTATION, CM_NAME


class TestLeaderElector(KubeTestCase):
def test_is_cm_mine(self):
elector = LeaderElector(self.kube)
self.kube.host_name = 'foo'

error_cm = [
({}, KeyError),
({'metadata': {}}, KeyError),
]
for cm, ex in error_cm:
self.assertRaises(ex, elector._is_cm_mine, cm)

cm = {'metadata': {'annotations': {CREATOR_ANNOTATION: 'foo'}}}
self.assertEqual(elector._is_cm_mine(cm), True)
cm = {'metadata': {'annotations': {}}}
self.assertEqual(elector._is_cm_mine(cm), False)
cm = {'metadata': {'annotations': {CREATOR_ANNOTATION: 'bar'}}}
self.assertEqual(elector._is_cm_mine(cm), False)

def test_build_update_cm_payload(self):
now = datetime.datetime.utcnow()
self.kube.host_name = 'foo'
elector = LeaderElector(self.kube)

cm = {
'kind': 'ConfigMap',
'apiVersion': 'v1',
'data': {},
'metadata': {
'name': 'datadog-leader-elector',
'namespace': 'default',
'resourceVersion': '5563782',
'creationTimestamp': '2017-08-21T17:37:32Z',
'annotations': {'acquired_time': datetime.datetime.strftime(now, "%Y-%m-%dT%H:%M:%S.%f"),
'creator': 'dd-agent-284pl'
},
'selfLink': '/api/v1/namespaces/default/configmaps/datadog-leader-elector',
'uid': '697b957c-8697-11e7-b62f-42010af002d4'
},
}
time.sleep(1)
pl = elector._build_update_cm_payload(cm)
self.assertEqual(pl['data'], cm['data'])
self.assertEqual(pl['metadata']['name'], cm['metadata']['name'])
self.assertEqual(pl['metadata']['namespace'], cm['metadata']['namespace'])
self.assertEqual(pl['metadata']['annotations'][CREATOR_ANNOTATION], cm['metadata']['annotations'][CREATOR_ANNOTATION])
self.assertTrue(pl['metadata']['annotations'][ACQUIRE_TIME_ANNOTATION] > cm['metadata']['annotations'][ACQUIRE_TIME_ANNOTATION])

def test_build_create_cm_payload(self):
now = datetime.datetime.utcnow()
self.kube.host_name = 'foo'
elector = LeaderElector(self.kube)

cm = {
'data': {},
'metadata': {
'name': CM_NAME,
'namespace': 'default',
'annotations': {
CREATOR_ANNOTATION: 'foo',
ACQUIRE_TIME_ANNOTATION: datetime.datetime.strftime(now, "%Y-%m-%dT%H:%M:%S.%f")
}
}
}
pl = elector._build_create_cm_payload()
self.assertEqual(pl['data'], cm['data'])
self.assertEqual(pl['metadata']['name'], cm['metadata']['name'])
self.assertEqual(pl['metadata']['namespace'], cm['metadata']['namespace'])
self.assertEqual(pl['metadata']['annotations'][CREATOR_ANNOTATION], cm['metadata']['annotations'][CREATOR_ANNOTATION])
self.assertTrue(pl['metadata']['annotations'][ACQUIRE_TIME_ANNOTATION] >= cm['metadata']['annotations'][ACQUIRE_TIME_ANNOTATION])

def test_is_lock_expired(self):
elector = LeaderElector(self.kube)

cm = {
'kind': 'ConfigMap',
'apiVersion': 'v1',
'data': {},
'metadata': {
'name': 'datadog-leader-elector',
'namespace': 'default',
'resourceVersion': '5563782',
'creationTimestamp': '2017-08-21T17:37:32Z',
'annotations': {'acquired_time': '2017-08-21T17:37:32.514660',
'creator': 'dd-agent-284pl'
},
'selfLink': '/api/v1/namespaces/default/configmaps/datadog-leader-elector',
'uid': '697b957c-8697-11e7-b62f-42010af002d4'
}
}

self.assertTrue(elector._is_lock_expired(cm))
cm['metadata']['annotations'][ACQUIRE_TIME_ANNOTATION] = '3017-08-21T17:37:32.514660'
self.assertFalse(elector._is_lock_expired(cm))

@mock.patch.object(LeaderElector, '_get_cm', return_value=None)
@mock.patch.object(LeaderElector, '_try_lock_cm', return_value=False)
@mock.patch.object(LeaderElector, '_is_cm_mine', return_value=False)
@mock.patch.object(LeaderElector, '_is_lock_expired', return_value=True)
@mock.patch.object(LeaderElector, '_try_refresh_cm', return_value=True)
def test_try_refresh(self, m_refresh_cm, m_lock_expired, m_cm_mine, m_lock_cm, m_get_cm):
elector = LeaderElector(self.kube)

# First test uses the decorator return values
self.assertFalse(elector._try_refresh())

m_get_cm.return_value = {'some': 'thing'}

m_lock_expired.return_value = True
self.assertFalse(elector._try_refresh())

m_lock_cm.return_value = True
self.assertTrue(elector._try_refresh())

m_cm_mine.return_value = True
self.assertTrue(elector._try_refresh())

m_get_cm.return_value = None
m_lock_cm.return_value = False
m_refresh_cm.return_value = False
self.assertFalse(elector._try_refresh())

@mock.patch.object(LeaderElector, '_get_cm', return_value=None)
@mock.patch.object(LeaderElector, '_try_lock_cm', return_value=False)
@mock.patch.object(LeaderElector, '_is_cm_mine', return_value=False)
@mock.patch.object(LeaderElector, '_is_lock_expired', return_value=True)
def test_try_acquire(self, m_lock_expired, m_cm_mine, m_lock_cm, m_get_cm):
elector = LeaderElector(self.kube)

# First test uses the decorator return values
self.assertFalse(elector._try_acquire())

m_get_cm.return_value = {'some': 'thing'}
m_lock_cm.return_value = True
self.assertTrue(elector._try_acquire())

m_lock_cm.return_value = False
self.assertFalse(elector._try_acquire())

m_get_cm.return_value = None
m_lock_expired.return_value = True
self.assertFalse(elector._try_refresh())

m_lock_cm.return_value = True
self.assertTrue(elector._try_refresh())

m_lock_cm.return_value = True
self.assertTrue(elector._try_refresh())
1 change: 1 addition & 0 deletions utils/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)

from .leader_elector import LeaderElector # noqa: F401
from .kube_event_retriever import KubeEventRetriever # noqa: F401
from .pod_service_mapper import PodServiceMapper # noqa: F401
from .kubeutil import detect_is_k8s # noqa: F401
Expand Down
11 changes: 7 additions & 4 deletions utils/kubernetes/kube_event_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ def __init__(self, kubeutil_object, namespaces=None, kinds=None, delay=None):
self.last_resversion = -1
self.set_namespaces(namespaces)
self.set_kinds(kinds)
self.set_delay(delay)

# Request throttling to reduce apiserver traffic
self._request_interval = delay
self._last_lookup_timestamp = -1

def set_namespaces(self, namespaces):
Expand All @@ -66,10 +65,14 @@ def set_kinds(self, kinds):
if isinstance(kinds, basestring):
self.request_params['fieldSelector'] = 'involvedObject.kind=' + kinds

def set_delay(self, delay):
"""Request throttling to reduce apiserver traffic"""
self._request_interval = delay

def get_event_array(self):
"""
Fetch latest events from the apiserver for the namespaces and kinds set on init
and returns an array of event objects
and returns an array of event objects.
"""

# Request throttling
Expand All @@ -82,7 +85,7 @@ def get_event_array(self):
lastest_resversion = None
filtered_events = []

events = self.kubeutil.retrieve_json_auth(self.request_url, params=self.request_params)
events = self.kubeutil.retrieve_json_auth(self.request_url, params=self.request_params).json()

for event in events.get('items', []):
resversion = int(event.get('metadata', {}).get('resourceVersion', None))
Expand Down
Loading