diff --git a/config/kube_config.py b/config/kube_config.py index 58c96255..b1e2136e 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -63,7 +63,7 @@ def _create_temp_file_with_content(content): def _is_expired(expiry): - return ((parse_rfc3339(expiry) + EXPIRY_SKEW_PREVENTION_DELAY) <= + return ((parse_rfc3339(expiry) - EXPIRY_SKEW_PREVENTION_DELAY) <= datetime.datetime.utcnow().replace(tzinfo=UTC)) diff --git a/config/kube_config_test.py b/config/kube_config_test.py index 4ffca9fa..5eb4c332 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -22,10 +22,9 @@ import mock import yaml -from six import PY3 +from six import PY3, next from .config_exception import ConfigException -from .dateutil import parse_rfc3339 from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader, _cleanup_temp_files, _create_temp_file_with_content, list_kube_config_contexts, load_kube_config, @@ -33,6 +32,10 @@ BEARER_TOKEN_FORMAT = "Bearer %s" +EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +# should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY +EXPIRY_TIMEDELTA = 2 + NON_EXISTING_FILE = "zz_non_existing_file_472398324" @@ -40,6 +43,17 @@ def _base64(string): return base64.encodestring(string.encode()).decode() +def _format_expiry_datetime(dt): + return dt.strftime(EXPIRY_DATETIME_FORMAT) + + +def _get_expiry(loader): + expired_gcp_conf = (item for item in loader._config.value.get("users") + if item.get("name") == "expired_gcp") + return next(expired_gcp_conf).get("user").get("auth-provider") \ + .get("config").get("expiry") + + def _raise_exception(st): raise Exception(st) @@ -59,6 +73,8 @@ def _raise_exception(st): TEST_PASSWORD = "pass" # token for me:pass TEST_BASIC_TOKEN = "Basic bWU6cGFzcw==" +TEST_TOKEN_EXPIRY = _format_expiry_datetime( + datetime.datetime.utcnow() - datetime.timedelta(minutes=EXPIRY_TIMEDELTA)) TEST_SSL_HOST = "https://test-host" TEST_CERTIFICATE_AUTH = "cert-auth" @@ -194,10 +210,12 @@ class TestConfigNode(BaseTestCase): {"name": "test_name2", "value": {"key1", "test"}}, {"name": "test_name3", "value": [1, 2, 3]}], - "with_names_dup": [{"name": "test_name", "value": "test_value"}, - {"name": "test_name", - "value": {"key1", "test"}}, - {"name": "test_name3", "value": [1, 2, 3]}]} + "with_names_dup": [ + {"name": "test_name", "value": "test_value"}, + {"name": "test_name", + "value": {"key1", "test"}}, + {"name": "test_name3", "value": [1, 2, 3]} + ]} def setUp(self): super(TestConfigNode, self).setUp() @@ -213,7 +231,8 @@ def test_normal_map_array_operations(self): self.assertEqual(3, len(self.node['key2'])) self.assertEqual("test_obj/key3", self.node['key3'].name) - self.assertEqual({"inner_key": "inner_value"}, self.node['key3'].value) + self.assertEqual({"inner_key": "inner_value"}, + self.node['key3'].value) self.assertEqual("inner_value", self.node['key3']["inner_key"]) self.assertEqual(1, len(self.node['key3'])) @@ -255,7 +274,8 @@ def test_get_with_name_on_name_does_not_exists(self): def test_get_with_name_on_duplicate_name(self): self.expect_exception( lambda: self.node['with_names_dup'].get_with_name('test_name'), - "Expected only one object with name test_name in test_obj/with_names_dup list") + "Expected only one object with name test_name in " + "test_obj/with_names_dup list") class FakeConfig: @@ -421,7 +441,8 @@ class TestKubeConfigLoader(BaseTestCase): "name": "ssl", "cluster": { "server": TEST_SSL_HOST, - "certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64, + "certificate-authority-data": + TEST_CERTIFICATE_AUTH_BASE64, } }, { @@ -462,7 +483,7 @@ class TestKubeConfigLoader(BaseTestCase): "name": "gcp", "config": { "access-token": TEST_DATA_BASE64, - "expiry": "2000-01-01T12:00:00Z", # always in past + "expiry": TEST_TOKEN_EXPIRY, # always in past } }, "token": TEST_DATA_BASE64, # should be ignored @@ -492,7 +513,8 @@ class TestKubeConfigLoader(BaseTestCase): "id-token": TEST_OIDC_EXPIRED_LOGIN, "idp-certificate-authority-data": TEST_OIDC_CA, "idp-issuer-url": "https://example.org/identity", - "refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" + "refresh-token": + "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" } } } @@ -578,7 +600,6 @@ def test_load_gcp_token_no_refresh(self): loader.token) def test_load_gcp_token_with_refresh(self): - def cred(): return None cred.token = TEST_ANOTHER_DATA_BASE64 cred.expiry = datetime.datetime.now() @@ -587,7 +608,11 @@ def cred(): return None config_dict=self.TEST_KUBE_CONFIG, active_context="expired_gcp", get_google_credentials=lambda: cred) + original_expiry = _get_expiry(loader) self.assertTrue(loader._load_gcp_token()) + new_expiry = _get_expiry(loader) + # assert that the configs expiry actually updates + self.assertTrue(new_expiry > original_expiry) self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64, loader.token)