Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move generate_signed_url to standalone function in credentials. #589

Merged
merged 1 commit into from
Feb 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions gcloud/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,19 @@

"""A simple wrapper around the OAuth2 credentials library."""

import base64
import calendar
import datetime
import urllib
import six

from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from oauth2client import client
from oauth2client import crypt
from oauth2client import service_account
import pytz


def get_credentials():
Expand Down Expand Up @@ -91,3 +103,161 @@ def get_for_service_account_p12(client_email, private_key_path, scope=None):
service_account_name=client_email,
private_key=open(private_key_path, 'rb').read(),
scope=scope)


def _get_pem_key(credentials):
"""Gets RSA key for a PEM payload from a credentials object.

:type credentials: :class:`client.SignedJwtAssertionCredentials`,
:class:`service_account._ServiceAccountCredentials`
:param credentials: The credentials used to create an RSA key
for signing text.

:rtype: :class:`Crypto.PublicKey.RSA._RSAobj`
:returns: An RSA object used to sign text.
:raises: `TypeError` if `credentials` is the wrong type.
"""
if isinstance(credentials, client.SignedJwtAssertionCredentials):
# Take our PKCS12 (.p12) key and make it into a RSA key we can use.
pem_text = crypt.pkcs12_key_as_pem(credentials.private_key,
credentials.private_key_password)
elif isinstance(credentials, service_account._ServiceAccountCredentials):
pem_text = credentials._private_key_pkcs8_text
else:
raise TypeError((credentials,
'not a valid service account credentials type'))

return RSA.importKey(pem_text)


def _get_signed_query_params(credentials, expiration, signature_string):
"""Gets query parameters for creating a signed URL.

:type credentials: :class:`client.SignedJwtAssertionCredentials`,
:class:`service_account._ServiceAccountCredentials`
:param credentials: The credentials used to create an RSA key
for signing text.

:type expiration: int or long
:param expiration: When the signed URL should expire.

:type signature_string: string
:param signature_string: The string to be signed by the credentials.

:rtype: dict
:returns: Query parameters matching the signing credentials with a
signed payload.
"""
pem_key = _get_pem_key(credentials)
# Sign the string with the RSA key.
signer = PKCS1_v1_5.new(pem_key)
signature_hash = SHA256.new(signature_string)
signature_bytes = signer.sign(signature_hash)
signature = base64.b64encode(signature_bytes)

if isinstance(credentials, client.SignedJwtAssertionCredentials):
service_account_name = credentials.service_account_name
elif isinstance(credentials, service_account._ServiceAccountCredentials):
service_account_name = credentials._service_account_email
# We know one of the above must occur since `_get_pem_key` fails if not.
return {
'GoogleAccessId': service_account_name,
'Expires': str(expiration),
'Signature': signature,
}


def _utcnow(): # pragma: NO COVER testing replaces
"""Returns current time as UTC datetime.

NOTE: on the module namespace so tests can replace it.
"""
return datetime.datetime.utcnow()


def _get_expiration_seconds(expiration):
"""Convert 'expiration' to a number of seconds in the future.

:type expiration: int, long, datetime.datetime, datetime.timedelta
:param expiration: When the signed URL should expire.

:rtype: int
:returns: a timestamp as an absolute number of seconds.
"""
# If it's a timedelta, add it to `now` in UTC.
if isinstance(expiration, datetime.timedelta):
now = _utcnow().replace(tzinfo=pytz.utc)
expiration = now + expiration

# If it's a datetime, convert to a timestamp.
if isinstance(expiration, datetime.datetime):
# Make sure the timezone on the value is UTC
# (either by converting or replacing the value).
if expiration.tzinfo:
expiration = expiration.astimezone(pytz.utc)
else:
expiration = expiration.replace(tzinfo=pytz.utc)

# Turn the datetime into a timestamp (seconds, not microseconds).
expiration = int(calendar.timegm(expiration.timetuple()))

if not isinstance(expiration, six.integer_types):
raise TypeError('Expected an integer timestamp, datetime, or '
'timedelta. Got %s' % type(expiration))
return expiration


def generate_signed_url(credentials, resource, expiration,
api_access_endpoint='',
method='GET', content_md5=None,
content_type=None):
"""Generate signed URL to provide query-string auth'n to a resource.

:type credentials: :class:`oauth2client.appengine.AppAssertionCredentials`
:param credentials: Credentials object with an associated private key to
sign text.

:type resource: string
:param resource: A pointer to a specific resource
(typically, ``/bucket-name/path/to/blob.txt``).

:type expiration: int, long, datetime.datetime, datetime.timedelta
:param expiration: When the signed URL should expire.

:type api_access_endpoint: string
:param api_access_endpoint: Optional URI base. Defaults to empty string.

:type method: string
:param method: The HTTP verb that will be used when requesting the URL.

:type content_md5: string
:param content_md5: The MD5 hash of the object referenced by
``resource``.

:type content_type: string
:param content_type: The content type of the object referenced by
``resource``.

:rtype: string
:returns: A signed URL you can use to access the resource
until expiration.
"""
expiration = _get_expiration_seconds(expiration)

# Generate the string to sign.
signature_string = '\n'.join([
method,
content_md5 or '',
content_type or '',
str(expiration),
resource])

# Set the right query parameters.
query_params = _get_signed_query_params(credentials,
expiration,
signature_string)

# Return the built URL.
return '{endpoint}{resource}?{querystring}'.format(
endpoint=api_access_endpoint, resource=resource,
querystring=urllib.urlencode(query_params))
12 changes: 9 additions & 3 deletions gcloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
from _gcloud_vendor.apitools.base.py import http_wrapper
from _gcloud_vendor.apitools.base.py import transfer

from gcloud.credentials import generate_signed_url
from gcloud.storage._helpers import _PropertyMixin
from gcloud.storage._helpers import _scalar_property
from gcloud.storage.acl import ObjectACL


_API_ACCESS_ENDPOINT = 'https://storage.googleapis.com'


class Blob(_PropertyMixin):
"""A wrapper around Cloud Storage's concept of an ``Object``."""

Expand Down Expand Up @@ -157,9 +161,11 @@ def generate_signed_url(self, expiration, method='GET'):
resource = '/{bucket_name}/{quoted_name}'.format(
bucket_name=self.bucket.name,
quoted_name=urllib.quote(self.name, safe=''))
return self.connection.generate_signed_url(resource=resource,
expiration=expiration,
method=method)

return generate_signed_url(
self.connection.credentials, resource=resource,
api_access_endpoint=_API_ACCESS_ENDPOINT,
expiration=expiration, method=method)

def exists(self):
"""Determines whether or not this blob exists.
Expand Down
Loading