From 7ee5e6a43902f0c5ef889ca7b3528d54dc92716c Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 3 Mar 2016 10:18:16 -0800 Subject: [PATCH] Removing crypto code and just relying on oauth2client helpers. Fixes #1408. --- gcloud/credentials.py | 120 ++---------- gcloud/test_credentials.py | 392 ++----------------------------------- setup.py | 2 +- 3 files changed, 30 insertions(+), 484 deletions(-) diff --git a/gcloud/credentials.py b/gcloud/credentials.py index da88afb906a1..11af91d54e69 100644 --- a/gcloud/credentials.py +++ b/gcloud/credentials.py @@ -19,27 +19,7 @@ import six from six.moves.urllib.parse import urlencode # pylint: disable=F0401 -try: - from OpenSSL import crypto -except ImportError: # pragma: NO COVER - # pyOpenSSL can't be installed on App Engine, but it will not - # be needed there since app_identity is used. - crypto = None - from oauth2client import client -from oauth2client import crypt -from oauth2client.service_account import ServiceAccountCredentials -try: - from oauth2client.contrib.appengine import ( - AppAssertionCredentials as _GAECreds) -except ImportError: - class _GAECreds(object): - """Dummy class if not in App Engine environment.""" - -try: - from google.appengine.api import app_identity -except ImportError: - app_identity = None from gcloud._helpers import UTC from gcloud._helpers import _NOW @@ -102,95 +82,10 @@ def get_credentials(): return client.GoogleCredentials.get_application_default() -def _get_pem_key(credentials): - """Gets private key for a PEM payload from a credentials object. - - :type credentials: :class:`service_account.ServiceAccountCredentials`, - :param credentials: The credentials used to create a private key - for signing text. - - :rtype: :class:`OpenSSL.crypto.PKey` - :returns: A PKey object used to sign text. - :raises: `TypeError` if `credentials` is the wrong type. - `EnvironmentError` if `crypto` did not import successfully. - """ - if isinstance(credentials, ServiceAccountCredentials): - if credentials._private_key_pkcs12 is not None: - # Take our PKCS12 (.p12) text and convert to PEM text. - pem_text = crypt.pkcs12_key_as_pem( - credentials._private_key_pkcs12, - credentials._private_key_password) - else: - pem_text = credentials._private_key_pkcs8_pem - else: - raise TypeError((credentials, - 'not a valid service account credentials type')) - - if crypto is None: - raise EnvironmentError( - 'pyOpenSSL must be installed to load a private key') - return crypto.load_privatekey(crypto.FILETYPE_PEM, pem_text) - - -def _get_signature_bytes(credentials, string_to_sign): - """Uses crypto attributes of credentials to sign a string/bytes. - - :type credentials: :class:`service_account.ServiceAccountCredentials`, - :class:`_GAECreds` - :param credentials: The credentials used for signing text (typically - involves the creation of a PKey). - - :type string_to_sign: string - :param string_to_sign: The string to be signed by the credentials. - - :rtype: bytes - :returns: Signed bytes produced by the credentials. - :raises: `EnvironmentError` if `crypto` did not import successfully. - """ - if isinstance(credentials, _GAECreds): - _, signed_bytes = app_identity.sign_blob(string_to_sign) - return signed_bytes - else: - # Sign the string with the PKey. - pkey = _get_pem_key(credentials) - if not isinstance(string_to_sign, six.binary_type): - string_to_sign = string_to_sign.encode('utf-8') - if crypto is None: - raise EnvironmentError( - 'pyOpenSSL must be installed to sign content using a ' - 'private key') - return crypto.sign(pkey, string_to_sign, 'SHA256') - - -def _get_service_account_name(credentials): - """Determines service account name from a credentials object. - - :type credentials: :class:`service_account.ServiceAccountCredentials`, - :class:`_GAECreds` - :param credentials: The credentials used to determine the service - account name. - - :rtype: string - :returns: Service account name associated with the credentials. - :raises: :class:`ValueError` if the credentials are not a valid service - account type. - """ - service_account_name = None - if isinstance(credentials, ServiceAccountCredentials): - service_account_name = credentials.service_account_email - elif isinstance(credentials, _GAECreds): - service_account_name = app_identity.get_service_account_name() - - if service_account_name is None: - raise ValueError('Service account name could not be determined ' - 'from credentials') - return service_account_name - - def _get_signed_query_params(credentials, expiration, string_to_sign): """Gets query parameters for creating a signed URL. - :type credentials: :class:`service_account.ServiceAccountCredentials` + :type credentials: :class:`oauth2client.client.AssertionCredentials` :param credentials: The credentials used to create a private key for signing text. @@ -204,9 +99,9 @@ def _get_signed_query_params(credentials, expiration, string_to_sign): :returns: Query parameters matching the signing credentials with a signed payload. """ - signature_bytes = _get_signature_bytes(credentials, string_to_sign) + _, signature_bytes = credentials.sign_blob(string_to_sign) signature = base64.b64encode(signature_bytes) - service_account_name = _get_service_account_name(credentials) + service_account_name = credentials.service_account_email return { 'GoogleAccessId': service_account_name, 'Expires': str(expiration), @@ -246,6 +141,15 @@ def generate_signed_url(credentials, resource, expiration, response_disposition=None, generation=None): """Generate signed URL to provide query-string auth'n to a resource. + .. note:: + + Assumes ``credentials`` implements a ``sign_blob()`` method that takes + bytes to sign and returns a pair of the key ID (unused here) and the + signed bytes (this is abstract in the base class + :class:`oauth2client.client.AssertionCredentials`). Also assumes + ``credentials`` has a ``service_account_email`` property which + identifies the credentials. + .. note:: If you are on Google Compute Engine, you can't generate a signed URL. diff --git a/gcloud/test_credentials.py b/gcloud/test_credentials.py index 45b85aa4336f..e4108cff11c9 100644 --- a/gcloud/test_credentials.py +++ b/gcloud/test_credentials.py @@ -15,53 +15,6 @@ import unittest2 -def _setup_appengine_import(test_case, app_identity): - import sys - import types - - GOOGLE = types.ModuleType('google') - GAE = types.ModuleType('appengine') - GAE_API = types.ModuleType('api') - GAE_EXT = types.ModuleType('ext') - GAE_EXT_WEBAPP = types.ModuleType('webapp') - GAE_EXT_WEBAPP_UTIL = types.ModuleType('util') - - GOOGLE.appengine = GAE - GAE.api = GAE_API - GAE.api.app_identity = app_identity - GAE.api.memcache = None - GAE.api.users = None - GAE.ext = GAE_EXT - GAE.ext.db = _MockDB - GAE.ext.webapp = GAE_EXT_WEBAPP - GAE.ext.webapp.util = GAE_EXT_WEBAPP_UTIL - GAE.ext.webapp.util.login_required = None - GAE.ext.webapp.util.run_wsgi_app = None - - test_case._PREV_GOOGLE_MODULE = sys.modules['google'] - - sys.modules['google'] = GOOGLE - sys.modules['google.appengine'] = GAE - sys.modules['google.appengine.api'] = GAE_API - sys.modules['google.appengine.ext'] = GAE_EXT - sys.modules['google.appengine.ext.webapp'] = GAE_EXT_WEBAPP - sys.modules['google.appengine.ext.webapp.util'] = GAE_EXT_WEBAPP_UTIL - sys.modules['webapp2'] = GAE_EXT_WEBAPP - - -def _teardown_appengine_import(test_case): - import sys - sys.modules.pop('google') - sys.modules.pop('google.appengine') - sys.modules.pop('google.appengine.api') - sys.modules.pop('google.appengine.ext') - sys.modules.pop('google.appengine.ext.webapp') - sys.modules.pop('google.appengine.ext.webapp.util') - sys.modules.pop('webapp2') - - sys.modules['google'] = test_case._PREV_GOOGLE_MODULE - - class Test_get_credentials(unittest2.TestCase): def _callFUT(self): @@ -102,7 +55,7 @@ def _generate_helper(self, response_type=None, response_disposition=None, def _get_signed_query_params(*args): credentials, expiration = args[:2] return { - 'GoogleAccessId': credentials.service_account_name, + 'GoogleAccessId': credentials.service_account_email, 'Expires': str(expiration), 'Signature': SIGNED, } @@ -123,7 +76,7 @@ def _get_signed_query_params(*args): self.assertEqual(params.pop('Signature'), [SIGNED.decode('ascii')]) self.assertEqual(params.pop('Expires'), ['1000']) self.assertEqual(params.pop('GoogleAccessId'), - [_Credentials.service_account_name]) + [CREDENTIALS.service_account_email]) if response_type is not None: self.assertEqual(params.pop('response-content-type'), [response_type]) @@ -148,173 +101,6 @@ def test_w_custom_fields(self): generation=generation) -class Test__get_signature_bytes(unittest2.TestCase): - - def setUp(self): - SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' - self.APP_IDENTITY = _AppIdentity(SERVICE_ACCOUNT_NAME) - _setup_appengine_import(self, self.APP_IDENTITY) - - def tearDown(self): - _teardown_appengine_import(self) - - def _callFUT(self, credentials, string_to_sign): - from gcloud.credentials import _get_signature_bytes - return _get_signature_bytes(credentials, string_to_sign) - - def _run_with_fake_crypto(self, credentials, private_key_text, - string_to_sign): - import base64 - import six - from gcloud._testing import _Monkey - from gcloud import credentials as MUT - - crypt = _Crypt() - load_result = object() - sign_result = object() - openssl_crypto = _OpenSSLCrypto(load_result, sign_result) - - with _Monkey(MUT, crypt=crypt, crypto=openssl_crypto): - result = self._callFUT(credentials, string_to_sign) - - if crypt._pkcs12_key_as_pem_called: - self.assertEqual(crypt._private_key_text, private_key_text) - self.assertEqual(crypt._private_key_password, 'notasecret') - self.assertEqual(openssl_crypto._loaded, - [(openssl_crypto.FILETYPE_PEM, _Crypt._KEY)]) - else: - self.assertEqual(openssl_crypto._loaded, - [(openssl_crypto.FILETYPE_PEM, private_key_text)]) - - if not isinstance(string_to_sign, six.binary_type): - string_to_sign = string_to_sign.encode('utf-8') - self.assertEqual(openssl_crypto._signed, - [(load_result, string_to_sign, 'SHA256')]) - - self.assertEqual(result, sign_result) - - def test_p12_type(self): - from oauth2client.service_account import ServiceAccountCredentials - ACCOUNT_NAME = 'dummy_service_account_name' - PRIVATE_KEY_TEXT = b'dummy_private_key_text' - STRING_TO_SIGN = b'dummy_signature' - SIGNER = object() - CREDENTIALS = ServiceAccountCredentials( - ACCOUNT_NAME, SIGNER) - CREDENTIALS._private_key_pkcs12 = PRIVATE_KEY_TEXT - CREDENTIALS._private_key_password = 'notasecret' - self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, - STRING_TO_SIGN) - - def test_p12_type_non_bytes_to_sign(self): - from oauth2client.service_account import ServiceAccountCredentials - ACCOUNT_NAME = 'dummy_service_account_name' - PRIVATE_KEY_TEXT = b'dummy_private_key_text' - STRING_TO_SIGN = u'dummy_signature' - SIGNER = object() - CREDENTIALS = ServiceAccountCredentials( - ACCOUNT_NAME, SIGNER) - CREDENTIALS._private_key_pkcs12 = PRIVATE_KEY_TEXT - CREDENTIALS._private_key_password = 'notasecret' - self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, - STRING_TO_SIGN) - - def test_json_type(self): - from oauth2client import service_account - from gcloud._testing import _Monkey - - PRIVATE_KEY_TEXT = 'dummy_private_key_pkcs8_text' - STRING_TO_SIGN = b'dummy_signature' - SIGNER = object() - CREDENTIALS = service_account.ServiceAccountCredentials( - 'dummy_service_account_email', SIGNER) - CREDENTIALS._private_key_pkcs8_pem = PRIVATE_KEY_TEXT - self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, - STRING_TO_SIGN) - - def test_gae_type(self): - # Relies on setUp fixing up App Engine imports. - from oauth2client.contrib.appengine import AppAssertionCredentials - from gcloud._testing import _Monkey - from gcloud import credentials - - APP_IDENTITY = self.APP_IDENTITY - CREDENTIALS = AppAssertionCredentials([]) - STRING_TO_SIGN = b'STRING_TO_SIGN' - - with _Monkey(credentials, _GAECreds=AppAssertionCredentials, - app_identity=APP_IDENTITY): - signed_bytes = self._callFUT(CREDENTIALS, b'STRING_TO_SIGN') - - self.assertEqual(signed_bytes, STRING_TO_SIGN) - self.assertEqual(APP_IDENTITY._strings_signed, [STRING_TO_SIGN]) - - def test_without_pyopenssl(self): - from gcloud._testing import _Monkey - from gcloud import credentials as credentials_mod - - mock_called = [] - credentials = object() - - def mock_pem_key(local_creds): - mock_called.append(local_creds) - - with _Monkey(credentials_mod, crypto=None, _get_pem_key=mock_pem_key): - with self.assertRaises(EnvironmentError): - self._callFUT(credentials, b'STRING_TO_SIGN') - self.assertEqual(mock_called, [credentials]) - - -class Test__get_service_account_name(unittest2.TestCase): - - def setUp(self): - SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' - self.APP_IDENTITY = _AppIdentity(SERVICE_ACCOUNT_NAME) - _setup_appengine_import(self, self.APP_IDENTITY) - - def tearDown(self): - _teardown_appengine_import(self) - - def _callFUT(self, credentials): - from gcloud.credentials import _get_service_account_name - return _get_service_account_name(credentials) - - def test_bad_type(self): - from oauth2client.client import OAuth2Credentials - CREDENTIALS = OAuth2Credentials('bogus_token', 'bogus_id', - 'bogus_secret', 'bogus_refresh', - None, None, None) - self.assertRaises(ValueError, self._callFUT, CREDENTIALS) - - def test_service_account_type(self): - from oauth2client import service_account - - SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' - SIGNER = object() - CREDENTIALS = service_account.ServiceAccountCredentials( - SERVICE_ACCOUNT_NAME, SIGNER) - - found = self._callFUT(CREDENTIALS) - self.assertEqual(found, SERVICE_ACCOUNT_NAME) - - def test_gae_type(self): - # Relies on setUp fixing up App Engine imports. - from oauth2client.contrib.appengine import AppAssertionCredentials - from gcloud._testing import _Monkey - from gcloud import credentials - - APP_IDENTITY = self.APP_IDENTITY - SERVICE_ACCOUNT_NAME = APP_IDENTITY.service_account_name - - CREDENTIALS = AppAssertionCredentials([]) - - with _Monkey(credentials, _GAECreds=AppAssertionCredentials, - app_identity=APP_IDENTITY): - found = self._callFUT(CREDENTIALS) - - self.assertEqual(found, SERVICE_ACCOUNT_NAME) - - class Test__get_signed_query_params(unittest2.TestCase): def _callFUT(self, credentials, expiration, string_to_sign): @@ -327,112 +113,21 @@ def test_it(self): from gcloud._testing import _Monkey from gcloud import credentials as MUT - _called_get_sig = [] SIG_BYTES = b'DEADBEEF' - - def mock_get_sig_bytes(creds, string_to_sign): - _called_get_sig.append((creds, string_to_sign)) - return SIG_BYTES - - _called_get_name = [] ACCOUNT_NAME = object() - - def mock_get_name(creds): - _called_get_name.append((creds,)) - return ACCOUNT_NAME - - CREDENTIALS = object() + CREDENTIALS = _Credentials(sign_result=SIG_BYTES, + service_account_email=ACCOUNT_NAME) EXPIRATION = 100 STRING_TO_SIGN = 'dummy_signature' - with _Monkey(MUT, _get_signature_bytes=mock_get_sig_bytes, - _get_service_account_name=mock_get_name): - result = self._callFUT(CREDENTIALS, EXPIRATION, - STRING_TO_SIGN) + result = self._callFUT(CREDENTIALS, EXPIRATION, + STRING_TO_SIGN) self.assertEqual(result, { 'GoogleAccessId': ACCOUNT_NAME, 'Expires': str(EXPIRATION), 'Signature': base64.b64encode(b'DEADBEEF'), }) - self.assertEqual(_called_get_sig, - [(CREDENTIALS, STRING_TO_SIGN)]) - self.assertEqual(_called_get_name, [(CREDENTIALS,)]) - - -class Test__get_pem_key(unittest2.TestCase): - - def _callFUT(self, credentials): - from gcloud.credentials import _get_pem_key - return _get_pem_key(credentials) - - def test_bad_argument(self): - self.assertRaises(TypeError, self._callFUT, None) - - def test_signed_jwt_for_p12(self): - from oauth2client import service_account - from gcloud._testing import _Monkey - from gcloud import credentials as MUT - - PRIVATE_KEY = b'dummy_private_key_text' - SIGNER = object() - credentials = service_account.ServiceAccountCredentials( - 'dummy_service_account_email', SIGNER) - credentials._private_key_pkcs12 = PRIVATE_KEY - credentials._private_key_password = password = 'password-nope' - - crypt = _Crypt() - load_result = object() - openssl_crypto = _OpenSSLCrypto(load_result, None) - - with _Monkey(MUT, crypt=crypt, crypto=openssl_crypto): - result = self._callFUT(credentials) - - self.assertEqual(crypt._private_key_text, PRIVATE_KEY) - self.assertEqual(crypt._private_key_password, password) - self.assertEqual(result, load_result) - self.assertEqual(openssl_crypto._loaded, - [(openssl_crypto.FILETYPE_PEM, _Crypt._KEY)]) - self.assertEqual(openssl_crypto._signed, []) - - def test_service_account_via_json_key(self): - from oauth2client import service_account - from gcloud._testing import _Monkey - from gcloud import credentials as MUT - - scopes = [] - - PRIVATE_TEXT = 'dummy_private_key_pkcs8_text' - SIGNER = object() - credentials = service_account.ServiceAccountCredentials( - 'dummy_service_account_email', SIGNER, scopes=scopes) - credentials._private_key_pkcs8_pem = PRIVATE_TEXT - - load_result = object() - openssl_crypto = _OpenSSLCrypto(load_result, None) - - with _Monkey(MUT, crypto=openssl_crypto): - result = self._callFUT(credentials) - - self.assertEqual(result, load_result) - self.assertEqual(openssl_crypto._loaded, - [(openssl_crypto.FILETYPE_PEM, PRIVATE_TEXT)]) - self.assertEqual(openssl_crypto._signed, []) - - def test_without_pyopenssl(self): - from oauth2client import service_account - from gcloud._testing import _Monkey - from gcloud import credentials as credentials_mod - - PRIVATE_TEXT = 'dummy_private_key_pkcs8_text' - SIGNER = object() - - credentials = service_account.ServiceAccountCredentials( - 'dummy_service_account_email', SIGNER) - credentials._private_key_pkcs8_pem = PRIVATE_TEXT - - with _Monkey(credentials_mod, crypto=None): - with self.assertRaises(EnvironmentError): - self._callFUT(credentials) + self.assertEqual(CREDENTIALS._signed, [STRING_TO_SIGN]) class Test__get_expiration_seconds(unittest2.TestCase): @@ -519,7 +214,16 @@ def test_w_timedelta_days(self): class _Credentials(object): - service_account_name = 'testing@example.com' + + def __init__(self, service_account_email='testing@example.com', + sign_result=''): + self.service_account_email = service_account_email + self._sign_result = sign_result + self._signed = [] + + def sign_blob(self, bytes_to_sign): + self._signed.append(bytes_to_sign) + return None, self._sign_result class _Client(object): @@ -534,65 +238,3 @@ def get_application_default(): return self._signed self.GoogleCredentials = GoogleCredentials - - -class _Crypt(object): - - _pkcs12_key_as_pem_called = False - _KEY = '__PEM__' - - def pkcs12_key_as_pem(self, private_key_text, private_key_password): - self._pkcs12_key_as_pem_called = True - self._private_key_text = private_key_text - self._private_key_password = private_key_password - return self._KEY - - -class _OpenSSLCrypto(object): - - FILETYPE_PEM = object() - - def __init__(self, load_result, sign_result): - self._loaded = [] - self._load_result = load_result - self._signed = [] - self._sign_result = sign_result - - def load_privatekey(self, key_type, key_text): - self._loaded.append((key_type, key_text)) - return self._load_result - - def sign(self, pkey, to_sign, sign_algo): - self._signed.append((pkey, to_sign, sign_algo)) - return self._sign_result - - -class _AppIdentity(object): - - def __init__(self, service_account_name): - self._strings_signed = [] - self.service_account_name = service_account_name - - def get_service_account_name(self): - return self.service_account_name - - def sign_blob(self, string_to_sign): - self._strings_signed.append(string_to_sign) - throwaway = object() - return throwaway, string_to_sign - - -class _MockDB(object): - - Model = object - Property = object - StringProperty = object - _stored = [] - - @staticmethod - def non_transactional(*args, **kwargs): - _MockDB._stored.append((args, kwargs)) # To please lint. - - def do_nothing_wrapper(func): - return func - return do_nothing_wrapper diff --git a/setup.py b/setup.py index a3d5fe54809e..07c0e9d821c5 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ REQUIREMENTS = [ 'httplib2 >= 0.9.1', 'googleapis-common-protos', - 'oauth2client >= 2.0.0.post1', + 'oauth2client >= 2.0.1', 'protobuf >= 3.0.0b2', 'pyOpenSSL', 'six',