Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

fixing flipped sign in expiry time padding #55

Merged
merged 1 commit into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _create_temp_file_with_content(content):


def _is_expired(expiry):
return ((parse_rfc3339(expiry) + EXPIRY_SKEW_PREVENTION_DELAY) <=
return ((parse_rfc3339(expiry) - EXPIRY_SKEW_PREVENTION_DELAY) <=
datetime.datetime.utcnow().replace(tzinfo=UTC))


Expand Down
49 changes: 37 additions & 12 deletions config/kube_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,38 @@

import mock
import yaml
from six import PY3
from six import PY3, next

from .config_exception import ConfigException
from .dateutil import parse_rfc3339
from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader,
_cleanup_temp_files, _create_temp_file_with_content,
list_kube_config_contexts, load_kube_config,
new_client_from_config)

BEARER_TOKEN_FORMAT = "Bearer %s"

EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY
EXPIRY_TIMEDELTA = 2

NON_EXISTING_FILE = "zz_non_existing_file_472398324"


def _base64(string):
return base64.encodestring(string.encode()).decode()


def _format_expiry_datetime(dt):
return dt.strftime(EXPIRY_DATETIME_FORMAT)


def _get_expiry(loader):
expired_gcp_conf = (item for item in loader._config.value.get("users")
if item.get("name") == "expired_gcp")
return next(expired_gcp_conf).get("user").get("auth-provider") \
.get("config").get("expiry")


def _raise_exception(st):
raise Exception(st)

Expand All @@ -59,6 +73,8 @@ def _raise_exception(st):
TEST_PASSWORD = "pass"
# token for me:pass
TEST_BASIC_TOKEN = "Basic bWU6cGFzcw=="
TEST_TOKEN_EXPIRY = _format_expiry_datetime(
datetime.datetime.utcnow() - datetime.timedelta(minutes=EXPIRY_TIMEDELTA))

TEST_SSL_HOST = "https://test-host"
TEST_CERTIFICATE_AUTH = "cert-auth"
Expand Down Expand Up @@ -194,10 +210,12 @@ class TestConfigNode(BaseTestCase):
{"name": "test_name2",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}],
"with_names_dup": [{"name": "test_name", "value": "test_value"},
{"name": "test_name",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}]}
"with_names_dup": [
{"name": "test_name", "value": "test_value"},
{"name": "test_name",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}
]}

def setUp(self):
super(TestConfigNode, self).setUp()
Expand All @@ -213,7 +231,8 @@ def test_normal_map_array_operations(self):
self.assertEqual(3, len(self.node['key2']))

self.assertEqual("test_obj/key3", self.node['key3'].name)
self.assertEqual({"inner_key": "inner_value"}, self.node['key3'].value)
self.assertEqual({"inner_key": "inner_value"},
self.node['key3'].value)
self.assertEqual("inner_value", self.node['key3']["inner_key"])
self.assertEqual(1, len(self.node['key3']))

Expand Down Expand Up @@ -255,7 +274,8 @@ def test_get_with_name_on_name_does_not_exists(self):
def test_get_with_name_on_duplicate_name(self):
self.expect_exception(
lambda: self.node['with_names_dup'].get_with_name('test_name'),
"Expected only one object with name test_name in test_obj/with_names_dup list")
"Expected only one object with name test_name in "
"test_obj/with_names_dup list")


class FakeConfig:
Expand Down Expand Up @@ -421,7 +441,8 @@ class TestKubeConfigLoader(BaseTestCase):
"name": "ssl",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64,
"certificate-authority-data":
TEST_CERTIFICATE_AUTH_BASE64,
}
},
{
Expand Down Expand Up @@ -462,7 +483,7 @@ class TestKubeConfigLoader(BaseTestCase):
"name": "gcp",
"config": {
"access-token": TEST_DATA_BASE64,
"expiry": "2000-01-01T12:00:00Z", # always in past
"expiry": TEST_TOKEN_EXPIRY, # always in past
}
},
"token": TEST_DATA_BASE64, # should be ignored
Expand Down Expand Up @@ -492,7 +513,8 @@ class TestKubeConfigLoader(BaseTestCase):
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-certificate-authority-data": TEST_OIDC_CA,
"idp-issuer-url": "https://example.org/identity",
"refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
Expand Down Expand Up @@ -578,7 +600,6 @@ def test_load_gcp_token_no_refresh(self):
loader.token)

def test_load_gcp_token_with_refresh(self):

def cred(): return None
cred.token = TEST_ANOTHER_DATA_BASE64
cred.expiry = datetime.datetime.now()
Expand All @@ -587,7 +608,11 @@ def cred(): return None
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_gcp",
get_google_credentials=lambda: cred)
original_expiry = _get_expiry(loader)
self.assertTrue(loader._load_gcp_token())
new_expiry = _get_expiry(loader)
# assert that the configs expiry actually updates
self.assertTrue(new_expiry > original_expiry)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64,
loader.token)

Expand Down