This repository has been archived by the owner on Jul 4, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathoauth.py
197 lines (154 loc) · 6.19 KB
/
oauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""The OAuth request handler.
Based on http://developers.facebook.com/docs/authentication/ .
"""
__author__ = ['Ryan Barrett <mockfacebook@ryanb.org>']
import base64
import logging
import os
import urllib
import urlparse
from webob import exc
import webapp2
AUTH_CODE_PATH = '/dialog/oauth'
ACCESS_TOKEN_PATH = '/oauth/access_token'
EXPIRES = '999999'
RANDOM_BYTES = 16
ERROR_TEXT = """
mockfacebook
Error
An error occurred: %s. Please try again later. (In the real Facebook, this is
nicely formatted HTML.)
"""
ERROR_JSON = '{"error":{"type":"OAuthException","message":"%s."}}'
class BaseHandler(webapp2.RequestHandler):
"""Base handler class for OAuth handlers.
Attributes:
conn: sqlite3.Connection
"""
@classmethod
def init(cls, conn, me=None):
# me is unused
cls.conn = conn
def get_required_args(self, *args):
"""Checks that one or more args are in the query args.
If any are not in args or are empty, raises an AssertionError with the
argument name as its associated value.
Args:
args: tuple of strings
Returns: list of strings
"""
values = [self.request.get(arg) for arg in args]
for arg, val in zip(args, values):
assert val, arg
return values
def create_auth_code(self, client_id, redirect_uri):
"""Generates, stores, and returns an auth code using the given parameters.
Args:
client_id: string
redirect_uri: string
Returns: string auth code
"""
code = base64.urlsafe_b64encode(os.urandom(RANDOM_BYTES))
self.conn.execute(
'INSERT INTO oauth_codes(code, client_id, redirect_uri) VALUES(?, ?, ?)',
(code, client_id, redirect_uri))
self.conn.commit()
return code
def create_access_token(self, code, client_id, redirect_uri):
"""Generates, stores, and returns an access token using the given parameters.
Args:
code: string auth code
client_id: string
redirect_uri: string
Returns: string auth code
"""
cursor = self.conn.execute(
'SELECT client_id, redirect_uri FROM oauth_codes WHERE code = ?', (code,))
row = cursor.fetchone()
assert row, ERROR_JSON % (
'Error validating verification code: auth code %s not found' % code)
code_client_id, code_redirect = row
for code_arg, arg, name in ((code_client_id, client_id, 'client_id'),
(code_redirect, redirect_uri, 'redirect_uri')):
assert code_arg == arg, ERROR_JSON % (
'mismatched %s values: %s received %s, %s received %s' %
(name, AUTH_CODE_PATH, code_arg, ACCESS_TOKEN_PATH, arg))
token = base64.urlsafe_b64encode(os.urandom(RANDOM_BYTES))
self.conn.execute('INSERT INTO oauth_access_tokens(code, token) VALUES(?, ?)',
(code, token))
self.conn.commit()
return token
class AuthCodeHandler(BaseHandler):
"""The auth code request handler.
"""
ROUTES = [(r'/dialog/oauth/?', 'oauth.AuthCodeHandler')]
def get(self):
state = self.request.get('state')
response_type = self.request.get('response_type')
try:
client_id, redirect_uri = self.get_required_args('client_id',
'redirect_uri')
except AssertionError, e:
self.response.out.write(ERROR_TEXT % 'missing %s' % unicode(e))
return
code = self.create_auth_code(client_id, redirect_uri)
redirect_parts = list(urlparse.urlparse(redirect_uri))
if response_type == 'token':
# client side flow. get an access token and put it in the fragment of the
# redirect URI. (also uses expires_in, not expires.) background:
# http://developers.facebook.com/docs/authentication/#client-side-flow
token = self.create_access_token(code, client_id, redirect_uri)
if redirect_parts[5]:
logging.warning('dropping original redirect URI fragment: %s' %
redirect_parts[5])
redirect_parts[5] = urllib.urlencode(
{'access_token': token, 'expires_in': EXPIRES})
else:
# server side flow. just put the auth code in the query args of the
# redirect URI. background:
# http://developers.facebook.com/docs/authentication/#server-side-flow
#
# dict(parse_qsl()) here instead of just parse_qs() so that the dict
# values are individual elements, not lists.
redirect_args = dict(urlparse.parse_qsl(redirect_parts[4]))
redirect_args['code'] = code
if state:
redirect_args['state'] = state
redirect_parts[4] = urllib.urlencode(redirect_args)
# wsgiref/webob expects a non Unicode redirect_url, otherwise it
# breaks with assert type(val) is StringType, "Header values must be strings" error.
redirect_url = str(urlparse.urlunparse(redirect_parts))
self.redirect(redirect_url)
class AccessTokenHandler(BaseHandler):
"""The access token request handler.
"""
ROUTES = [(r'/oauth/access_token/?', 'oauth.AccessTokenHandler')]
@staticmethod
def is_valid_token(conn, access_token):
"""Returns True if the given access token is valid, False otherwise."""
cursor = conn.execute('SELECT token FROM oauth_access_tokens WHERE token = ?',
(access_token,))
return cursor.fetchone() is not None
def get(self):
"""Handles a /oauth/access_token request to allocate an access token.
Writes the response directly.
"""
try:
grant_type = self.request.get('grant_type')
redirect_uri = self.request.get('redirect_uri')
try:
client_id, _, code = self.get_required_args('client_id', 'client_secret',
'code')
except AssertionError, e:
assert False, ERROR_JSON % 'Missing %s parameter' % unicode(e)
# app login. background:
# http://developers.facebook.com/docs/authentication/#applogin
if grant_type == 'client_credentials':
redirect_uri = ''
code = self.create_auth_code(client_id, redirect_uri)
token = self.create_access_token(code, client_id, redirect_uri)
self.response.charset = 'utf-8'
self.response.out.write(
urllib.urlencode({'access_token': token, 'expires': EXPIRES}))
except AssertionError, e:
raise exc.HTTPClientError(unicode(e).encode('utf8'))