Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Automated cherry pick of #79 #146

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions config/kube_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,31 @@ def _load_oid_token(self, provider):
if 'config' not in provider:
return

parts = provider['config']['id-token'].split('.')
reserved_characters = frozenset(["=", "+", "/"])
token = provider['config']['id-token']

if any(char in token for char in reserved_characters):
# Invalid jwt, as it contains url-unsafe chars
return

parts = token.split('.')
if len(parts) != 3: # Not a valid JWT
return None
return

padding = (4 - len(parts[1]) % 4) * '='
if len(padding) == 3:
# According to spec, 3 padding characters cannot occur
# in a valid jwt
# https://tools.ietf.org/html/rfc7515#appendix-C
return

if PY3:
jwt_attributes = json.loads(
base64.b64decode(parts[1]).decode('utf-8')
base64.b64decode(parts[1] + padding).decode('utf-8')
)
else:
jwt_attributes = json.loads(
base64.b64decode(parts[1] + "==")
base64.b64decode(parts[1] + padding)
)

expire = jwt_attributes.get('exp')
Expand Down
101 changes: 97 additions & 4 deletions config/kube_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ def _base64(string):
return base64.standard_b64encode(string.encode()).decode()


def _urlsafe_unpadded_b64encode(string):
return base64.urlsafe_b64encode(string.encode()).decode().rstrip('=')


def _format_expiry_datetime(dt):
return dt.strftime(EXPIRY_DATETIME_FORMAT)

Expand Down Expand Up @@ -96,12 +100,33 @@ def _raise_exception(st):

TEST_OIDC_TOKEN = "test-oidc-token"
TEST_OIDC_INFO = "{\"name\": \"test\"}"
TEST_OIDC_BASE = _base64(TEST_OIDC_TOKEN) + "." + _base64(TEST_OIDC_INFO)
TEST_OIDC_LOGIN = TEST_OIDC_BASE + "." + TEST_CLIENT_CERT_BASE64
TEST_OIDC_BASE = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
_urlsafe_unpadded_b64encode(TEST_OIDC_INFO)
])
TEST_OIDC_LOGIN = ".".join([
TEST_OIDC_BASE,
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT_BASE64)
])
TEST_OIDC_TOKEN = "Bearer %s" % TEST_OIDC_LOGIN
TEST_OIDC_EXP = "{\"name\": \"test\",\"exp\": 536457600}"
TEST_OIDC_EXP_BASE = _base64(TEST_OIDC_TOKEN) + "." + _base64(TEST_OIDC_EXP)
TEST_OIDC_EXPIRED_LOGIN = TEST_OIDC_EXP_BASE + "." + TEST_CLIENT_CERT_BASE64
TEST_OIDC_EXP_BASE = _urlsafe_unpadded_b64encode(
TEST_OIDC_TOKEN) + "." + _urlsafe_unpadded_b64encode(TEST_OIDC_EXP)
TEST_OIDC_EXPIRED_LOGIN = ".".join([
TEST_OIDC_EXP_BASE,
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])
TEST_OIDC_CONTAINS_RESERVED_CHARACTERS = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
_urlsafe_unpadded_b64encode(TEST_OIDC_INFO).replace("a", "+"),
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])
TEST_OIDC_INVALID_PADDING_LENGTH = ".".join([
_urlsafe_unpadded_b64encode(TEST_OIDC_TOKEN),
"aaaaa",
_urlsafe_unpadded_b64encode(TEST_CLIENT_CERT)
])

TEST_OIDC_CA = _base64(TEST_CERTIFICATE_AUTH)


Expand Down Expand Up @@ -408,6 +433,22 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "expired_oidc_nocert"
}
},
{
"name": "oidc_contains_reserved_character",
"context": {
"cluster": "default",
"user": "oidc_contains_reserved_character"

}
},
{
"name": "oidc_invalid_padding_length",
"context": {
"cluster": "default",
"user": "oidc_invalid_padding_length"

}
},
{
"name": "user_pass",
"context": {
Expand Down Expand Up @@ -594,6 +635,38 @@ class TestKubeConfigLoader(BaseTestCase):
}
}
},
{
"name": "oidc_contains_reserved_character",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_CONTAINS_RESERVED_CHARACTERS,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "oidc_invalid_padding_length",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_INVALID_PADDING_LENGTH,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "user_pass",
"user": {
Expand Down Expand Up @@ -792,6 +865,26 @@ def test_oidc_with_refresh_nocert(
self.assertTrue(loader._load_auth_provider_token())
self.assertEqual("Bearer abc123", loader.token)

def test_oidc_fails_if_contains_reserved_chars(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="oidc_contains_reserved_character",
)
self.assertEqual(
loader._load_oid_token("oidc_contains_reserved_character"),
None,
)

def test_oidc_fails_if_invalid_padding_length(self):
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="oidc_invalid_padding_length",
)
self.assertEqual(
loader._load_oid_token("oidc_invalid_padding_length"),
None,
)

def test_user_pass(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN)
actual = FakeConfig()
Expand Down