Skip to content

Commit

Permalink
Adding 3-legged auth for users getting started.
Browse files Browse the repository at this point in the history
Addresses final part of googleapis#335.
  • Loading branch information
dhermes committed Jan 13, 2015
1 parent 8a42ef5 commit 5dd60f9
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 7 deletions.
111 changes: 111 additions & 0 deletions gcloud/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@

"""A simple wrapper around the OAuth2 credentials library."""

import argparse
import json
import os
import sys
import tempfile
import six

from oauth2client import client
from oauth2client import file
from oauth2client import tools


def get_credentials():
Expand Down Expand Up @@ -59,6 +68,108 @@ def get_credentials():
return client.GoogleCredentials.get_application_default()


def _store_user_credential(credential):
"""Stores a user credential as a well-known file.
Prompts user first if they want to store the minted token and
then prompts the user for a filename to store the token
information in the format needed for get_credentials().
:type credential: :class:`oauth2client.client.OAuth2Credentials`
:param credential: A user credential to be stored.
"""
ans = six.moves.input('Would you like to store your tokens '
'for future use? [y/n] ')
if ans.strip().lower() != 'y':
return

filename = six.moves.input('Please name the file where you wish '
'to store them: ').strip()

payload = {
'client_id': credential.client_id,
'client_secret': credential.client_secret,
'refresh_token': credential.refresh_token,
'type': 'authorized_user',
}
with open(filename, 'w') as file_obj:
json.dump(payload, file_obj, indent=2, sort_keys=True,
separators=(',', ': '))
file_obj.write('\n')

print 'Saved %s' % (filename,)
print 'If you would like to use these credentials in the future'
print 'without having to initiate the authentication flow in your'
print 'browser, please set the GOOGLE_APPLICATION_CREDENTIALS'
print 'environment variable:'
print ' export GOOGLE_APPLICATION_CREDENTIALS=%r' % (filename,)
print 'Once you\'ve done this, you can use the get_credentials()'
print 'function, which relies on that environment variable.'
print ''
print 'Keep in mind, the refresh token can only be used with the'
print 'scopes you granted in the original authorization.'


def get_credentials_from_user_flow(scope, client_secrets_file=None):
"""Gets credentials by taking user through 3-legged auth flow.
The necessary information to perform the flow will be stored in a client
secrets file. This can be downloaded from the Google Cloud Console. First,
visit "APIs & auth > Credentials", and creating a new client ID for an
"Installed application" (or use an existing "Client ID for native
application"). Then click "Download JSON" on your chosen "Client ID for
native application" and save the client secrets file.
You can either pass this filename in directly via 'client_secrets_file'
or set the environment variable GCLOUD_CLIENT_SECRETS.
For more information, see:
developers.google.com/api-client-library/python/guide/aaa_client_secrets
:type scope: string or tuple of string
:param scope: The scope against which to authenticate. (Different services
require different scopes, check the documentation for which
scope is required for the different levels of access to any
particular API.)
:type client_secrets_file: string
:param client_secrets_file: Optional. File containing client secrets JSON.
:rtype: :class:`oauth2client.client.OAuth2Credentials`
:returns: A new credentials instance.
:raises: ``EnvironmentError`` if stdout is not a TTY or
``ValueError`` if the client secrets file is not for an installed
application
"""
if not sys.stdout.isatty():
raise EnvironmentError('Cannot initiate user flow unless user can '
'interact with standard out.')

if client_secrets_file is None:
client_secrets_file = os.getenv('GCLOUD_CLIENT_SECRETS')

client_type, client_info = client.clientsecrets.loadfile(
client_secrets_file)
if client_type != client.clientsecrets.TYPE_INSTALLED:
raise ValueError('Client secrets file must be for '
'installed application.')

redirect_uri = client_info['redirect_uris'][0]
flow = client.flow_from_clientsecrets(client_secrets_file, scope,
redirect_uri=redirect_uri)

parser = argparse.ArgumentParser(parents=[tools.argparser])
flags = parser.parse_args()
storage = file.Storage(tempfile.mktemp())
credential = tools.run_flow(flow, storage, flags)
# Remove the tempfile as a store for the credentials to prevent
# future writes to a non-existent file.
credential.store = None
# Determine if the user would like to store these credentials.
_store_user_credential(credential)
return credential


def get_for_service_account_p12(client_email, private_key_path, scope=None):
"""Gets the credentials for a service account.
Expand Down
225 changes: 218 additions & 7 deletions gcloud/test_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
import unittest2


class TestCredentials(unittest2.TestCase):
class Test_get_for_service_account_p12(unittest2.TestCase):

def test_get_for_service_account_p12_wo_scope(self):
def _callFUT(self, client_email, private_key_path, scope=None):
from gcloud.credentials import get_for_service_account_p12
return get_for_service_account_p12(client_email, private_key_path,
scope=scope)

def test_wo_scope(self):
from tempfile import NamedTemporaryFile
from gcloud import credentials
from gcloud._testing import _Monkey
Expand All @@ -28,8 +33,7 @@ def test_get_for_service_account_p12_wo_scope(self):
with NamedTemporaryFile() as file_obj:
file_obj.write(PRIVATE_KEY)
file_obj.flush()
found = credentials.get_for_service_account_p12(
CLIENT_EMAIL, file_obj.name)
found = self._callFUT(CLIENT_EMAIL, file_obj.name)
self.assertTrue(found is client._signed)
expected_called_with = {
'service_account_name': CLIENT_EMAIL,
Expand All @@ -38,7 +42,7 @@ def test_get_for_service_account_p12_wo_scope(self):
}
self.assertEqual(client._called_with, expected_called_with)

def test_get_for_service_account_p12_w_scope(self):
def test_w_scope(self):
from tempfile import NamedTemporaryFile
from gcloud import credentials
from gcloud._testing import _Monkey
Expand All @@ -50,8 +54,8 @@ def test_get_for_service_account_p12_w_scope(self):
with NamedTemporaryFile() as file_obj:
file_obj.write(PRIVATE_KEY)
file_obj.flush()
found = credentials.get_for_service_account_p12(
CLIENT_EMAIL, file_obj.name, SCOPE)
found = self._callFUT(CLIENT_EMAIL, file_obj.name,
scope=SCOPE)
self.assertTrue(found is client._signed)
expected_called_with = {
'service_account_name': CLIENT_EMAIL,
Expand All @@ -61,6 +65,192 @@ def test_get_for_service_account_p12_w_scope(self):
self.assertEqual(client._called_with, expected_called_with)


class Test__store_user_credential(unittest2.TestCase):

def _callFUT(self, credential):
from gcloud.credentials import _store_user_credential
return _store_user_credential(credential)

def test_user_input_no(self):
import six.moves
from gcloud._testing import _Monkey

_called_messages = []

def fake_input(message):
_called_messages.append(message)
# 'y' or 'Y' are the only acceptable values.
return 'neither yes nor no'

with _Monkey(six.moves, input=fake_input):
self._callFUT(None)

self.assertEqual(
_called_messages,
['Would you like to store your tokens for future use? [y/n] '])

def test_user_input_yes(self):
import json
import six.moves
import tempfile

from gcloud._testing import _Monkey
from oauth2client.client import OAuth2Credentials

_called_messages = []
# In reverse order so we can use .pop().
TEMPFILE = tempfile.mktemp()
responses = [TEMPFILE, 'y']

def fake_input(message):
_called_messages.append(message)
return responses.pop()

CLIENT_ID = 'FOO'
CLIENT_SECRET = 'BAR'
REFRESH_TOKEN = 'BAZ'
CREDENTIALS = OAuth2Credentials(None, CLIENT_ID, CLIENT_SECRET,
REFRESH_TOKEN, None, None, None)
with _Monkey(six.moves, input=fake_input):
self._callFUT(CREDENTIALS)

self.assertEqual(
_called_messages,
['Would you like to store your tokens for future use? [y/n] ',
'Please name the file where you wish to store them: '])

with open(TEMPFILE, 'r') as file_obj:
STORED_CREDS = json.load(file_obj)

expected_creds = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'refresh_token': REFRESH_TOKEN,
'type': 'authorized_user',
}
self.assertEqual(STORED_CREDS, expected_creds)


class Test_get_credentials_from_user_flow(unittest2.TestCase):

def _callFUT(self, scope, client_secrets_file=None):
from gcloud.credentials import get_credentials_from_user_flow
return get_credentials_from_user_flow(
scope, client_secrets_file=client_secrets_file)

def test_no_tty(self):
import sys
from gcloud._testing import _Monkey

STDOUT = _MockStdout(isatty=False)
with _Monkey(sys, stdout=STDOUT):
with self.assertRaises(EnvironmentError):
self._callFUT(None)

def test_filename_from_environ(self):
import os
import sys

from gcloud._testing import _Monkey
from oauth2client import client

STDOUT = _MockStdout(isatty=True)
FILENAME = 'FOO'
GCLOUD_KEY = 'GCLOUD_CLIENT_SECRETS'
FAKE_ENVIRON = {GCLOUD_KEY: FILENAME}

_called_keys = []

def fake_getenv(key):
_called_keys.append(key)
return FAKE_ENVIRON.get(key)

_called_filenames = []

def fake_loadfile(filename):
_called_filenames.append(filename)
return 'NOT_INSTALLED_TYPE', None

with _Monkey(sys, stdout=STDOUT):
with _Monkey(os, getenv=fake_getenv):
with _Monkey(client.clientsecrets, loadfile=fake_loadfile):
with self.assertRaises(ValueError):
self._callFUT(None)

self.assertEqual(_called_keys, [GCLOUD_KEY])
self.assertEqual(_called_filenames, [FILENAME])

def test_succeeds(self):
import argparse
import sys

from gcloud._testing import _Monkey
from gcloud import credentials
from oauth2client import client
from oauth2client.file import Storage
from oauth2client import tools

STDOUT = _MockStdout(isatty=True)
SCOPE = 'SCOPE'
FILENAME = 'FILENAME'
REDIRECT_URI = 'REDIRECT_URI'
MOCK_CLIENT_INFO = {'redirect_uris': [REDIRECT_URI]}
FLOW = object()
CLIENT_ID = 'FOO'
CLIENT_SECRET = 'BAR'
REFRESH_TOKEN = 'BAZ'
CREDENTIALS = client.OAuth2Credentials(None, CLIENT_ID, CLIENT_SECRET,
REFRESH_TOKEN, None, None, None)

_called_loadfile = []

def fake_loadfile(*args, **kwargs):
_called_loadfile.append((args, kwargs))
return client.clientsecrets.TYPE_INSTALLED, MOCK_CLIENT_INFO

_called_flow_from_clientsecrets = []

def mock_flow(client_secrets_file, scope, redirect_uri=None):
_called_flow_from_clientsecrets.append(
(client_secrets_file, scope, redirect_uri))
return FLOW

_called_run_flow = []

def mock_run_flow(flow, storage, flags):
_called_run_flow.append((flow, storage, flags))
return CREDENTIALS

_called_store_user_credential = []

def store_cred(credential):
_called_store_user_credential.append(credential)

with _Monkey(sys, stdout=STDOUT):
with _Monkey(client.clientsecrets, loadfile=fake_loadfile):
with _Monkey(client, flow_from_clientsecrets=mock_flow):
with _Monkey(tools, run_flow=mock_run_flow):
with _Monkey(credentials,
_store_user_credential=store_cred):
with _Monkey(argparse,
ArgumentParser=_MockArgumentParser):
self._callFUT(SCOPE,
client_secrets_file=FILENAME)

self.assertEqual(_called_loadfile, [((FILENAME,), {})])
self.assertEqual(_called_flow_from_clientsecrets,
[(FILENAME, SCOPE, REDIRECT_URI)])

# Unpack expects a single output
run_flow_input, = _called_run_flow
self.assertEqual(len(run_flow_input), 3)
self.assertEqual(run_flow_input[0], FLOW)
self.assertTrue(isinstance(run_flow_input[1], Storage))
self.assertTrue(run_flow_input[2] is _MockArgumentParser._MARKER)

self.assertEqual(_called_store_user_credential, [CREDENTIALS])


class _Credentials(object):

service_account_name = 'testing@example.com'
Expand All @@ -85,3 +275,24 @@ def get_application_default():
def SignedJwtAssertionCredentials(self, **kw):
self._called_with = kw
return self._signed


class _MockStdout(object):

def __init__(self, isatty=True):
self._isatty = isatty

def isatty(self):
return self._isatty


class _MockArgumentParser(object):

_MARKER = object()

def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs

def parse_args(self):
return self._MARKER
Loading

0 comments on commit 5dd60f9

Please sign in to comment.