diff --git a/config/kube_config.py b/config/kube_config.py index b1e2136e..5698a5c6 100644 --- a/config/kube_config.py +++ b/config/kube_config.py @@ -255,22 +255,27 @@ def _load_oid_token(self): return self.token def _refresh_oidc(self, provider): - ca_cert = tempfile.NamedTemporaryFile(delete=True) + config = Configuration() - if PY3: - cert = base64.b64decode( - provider['config']['idp-certificate-authority-data'] - ).decode('utf-8') - else: - cert = base64.b64decode( - provider['config']['idp-certificate-authority-data'] + "==" - ) + if 'idp-certificate-authority-data' in provider['config']: + ca_cert = tempfile.NamedTemporaryFile(delete=True) - with open(ca_cert.name, 'w') as fh: - fh.write(cert) + if PY3: + cert = base64.b64decode( + provider['config']['idp-certificate-authority-data'] + ).decode('utf-8') + else: + cert = base64.b64decode( + provider['config']['idp-certificate-authority-data'] + "==" + ) - config = Configuration() - config.ssl_ca_cert = ca_cert.name + with open(ca_cert.name, 'w') as fh: + fh.write(cert) + + config.ssl_ca_cert = ca_cert.name + + else: + config.verify_ssl = False client = ApiClient(configuration=config) @@ -301,7 +306,7 @@ def _refresh_oidc(self, provider): refresh_token=provider['config']['refresh-token'], auth=(provider['config']['client-id'], provider['config']['client-secret']), - verify=ca_cert.name + verify=config.ssl_ca_cert if config.verify_ssl else None ) except oauthlib.oauth2.rfc6749.errors.InvalidClientIdError: return diff --git a/config/kube_config_test.py b/config/kube_config_test.py index 5eb4c332..0ad3c66b 100644 --- a/config/kube_config_test.py +++ b/config/kube_config_test.py @@ -373,6 +373,13 @@ class TestKubeConfigLoader(BaseTestCase): "user": "expired_oidc" } }, + { + "name": "expired_oidc_nocert", + "context": { + "cluster": "default", + "user": "expired_oidc_nocert" + } + }, { "name": "user_pass", "context": { @@ -519,6 +526,22 @@ class TestKubeConfigLoader(BaseTestCase): } } }, + { + "name": "expired_oidc_nocert", + "user": { + "auth-provider": { + "name": "oidc", + "config": { + "client-id": "tectonic-kubectl", + "client-secret": "FAKE_SECRET", + "id-token": TEST_OIDC_EXPIRED_LOGIN, + "idp-issuer-url": "https://example.org/identity", + "refresh-token": + "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk" + } + } + } + }, { "name": "user_pass", "user": { @@ -649,6 +672,32 @@ def test_oidc_with_refresh(self, mock_ApiClient, mock_OAuth2Session): self.assertTrue(loader._load_oid_token()) self.assertEqual("Bearer abc123", loader.token) + @mock.patch('kubernetes.config.kube_config.OAuth2Session.refresh_token') + @mock.patch('kubernetes.config.kube_config.ApiClient.request') + def test_oidc_with_refresh_nocert( + self, mock_ApiClient, mock_OAuth2Session): + mock_response = mock.MagicMock() + type(mock_response).status = mock.PropertyMock( + return_value=200 + ) + type(mock_response).data = mock.PropertyMock( + return_value=json.dumps({ + "token_endpoint": "https://example.org/identity/token" + }) + ) + + mock_ApiClient.return_value = mock_response + + mock_OAuth2Session.return_value = {"id_token": "abc123", + "refresh_token": "newtoken123"} + + loader = KubeConfigLoader( + config_dict=self.TEST_KUBE_CONFIG, + active_context="expired_oidc_nocert", + ) + self.assertTrue(loader._load_oid_token()) + self.assertEqual("Bearer abc123", loader.token) + def test_user_pass(self): expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN) actual = FakeConfig()