Skip to content

Commit

Permalink
Merge pull request #16 from wkoot/python3-compatible-unicode-call
Browse files Browse the repository at this point in the history
Python3 compatible unicode call
  • Loading branch information
wkoot authored Sep 29, 2017
2 parents 23ce491 + f09ac1b commit c4ca57e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
56 changes: 30 additions & 26 deletions instagram/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _url_for_authorize(self, scope=None):
client_params.update(scope=' '.join(scope))

url_params = urlencode(client_params)
return "%s?%s" % (self.api.authorize_url, url_params)
return "{url}?{params}".format(url=self.api.authorize_url, params=url_params)

def _data_for_exchange(self, code=None, username=None, password=None, scope=None, user_id=None):
client_params = {
Expand Down Expand Up @@ -106,7 +106,7 @@ def get_authorize_login_url(self, scope=None):
url = self._url_for_authorize(scope=scope)
response, content = http_object.request(url)
if response['status'] != '200':
raise OAuth2AuthExchangeError("The server returned a non-200 response for URL %s" % url)
raise OAuth2AuthExchangeError("The server returned a non-200 response for URL {url}".format(url))

redirected_to = response['Content-Location']
return redirected_to
Expand All @@ -133,10 +133,10 @@ def __init__(self, api):
def _generate_sig(self, endpoint, params, secret):
# handle unicode when signing, urlencode can't handle otherwise.
def enc_if_str(p):
return p.encode('utf-8') if isinstance(p, unicode) else p
return p.encode('utf-8') if isinstance(p, six.text_type) else p

p = ''.join('|{}={}'.format(k, enc_if_str(params[k])) for k in sorted(params.keys()))
sig = '{}{}'.format(endpoint, p)
path = ''.join('|{key}={val}'.format(key=key, val=enc_if_str(params[key])) for key in sorted(params.keys()))
sig = '{endpoint}{path}'.format(endpoint=endpoint, path=path)
return hmac.new(secret.encode(), sig.encode(), sha256).hexdigest()

def url_for_get(self, path, parameters):
Expand All @@ -148,27 +148,30 @@ def get_request(self, path, **kwargs):
def post_request(self, path, **kwargs):
return self.make_request(self.prepare_request("POST", path, kwargs))

# TODO - make use of six.moves.urllib.parse.urlparse for all this string munging
def _full_url(self, path, include_secret=False, include_signed_request=True):
return "%s://%s%s%s%s%s" % (self.api.protocol, self.api.host, self.api.base_path, path,
self._auth_query(include_secret),
self._signed_request(path, {}, include_signed_request, include_secret))
signed_request = self._signed_request(path, {}, include_signed_request, include_secret)
return "{protocol}://{host}{basepath}{path}{query}{signed}".format(
protocol=self.api.protocol, host=self.api.host, basepath=self.api.base_path, path=path,
query=self._auth_query(include_secret), signed=signed_request)

def _full_url_with_params(self, path, params, include_secret=False, include_signed_request=True):
return (self._full_url(path, include_secret) +
self._full_query_with_params(params) +
self._signed_request(path, params, include_signed_request, include_secret))
signed_request = self._signed_request(path, params, include_signed_request, include_secret)
return "{url}{query}{signed}".format(
url=self._full_url(path, include_secret), query=self._full_query_with_params(params), signed=signed_request)

def _full_query_with_params(self, params):
params = ("&" + urlencode(params)) if params else ""
return params
if not params:
return ""
return "&{params}".format(params=urlencode(params))

def _auth_query(self, include_secret=False):
if self.api.access_token:
return ("?%s=%s" % (self.api.access_token_field, self.api.access_token))
return "?{field}={token}".format(field=self.api.access_token_field, token=self.api.access_token)
elif self.api.client_id:
base = ("?client_id=%s" % (self.api.client_id))
base = "?client_id={client_id}".format(client_id=self.api.client_id)
if include_secret:
base += "&client_secret=%s" % (self.api.client_secret)
base += "&client_secret={client_secret}".format(client_secret=self.api.client_secret)
return base

def _signed_request(self, path, params, include_signed_request, include_secret):
Expand All @@ -181,7 +184,7 @@ def _signed_request(self, path, params, include_signed_request, include_secret):
if include_secret and self.api.client_secret:
params['client_secret'] = self.api.client_secret

return "&sig=%s" % self._generate_sig(path, params, self.api.client_secret)
return "&sig={signed}".format(signed=self._generate_sig(path, params, self.api.client_secret))
else:
return ''

Expand All @@ -195,15 +198,16 @@ def get_content_type(file_name):
return mimetypes.guess_type(file_name)[0] or "application/octet-stream"

def encode_field(field_name):
return ("--" + boundary,
'Content-Disposition: form-data; name="%s"' % (field_name),
return ("--{boundary}".format(boundary=boundary),
'Content-Disposition: form-data; name="{field_name}"'.format(field_name=field_name),
"", str(params[field_name]))

def encode_file(field_name):
file_name, file_handle = files[field_name]
return ("--" + boundary,
'Content-Disposition: form-data; name="%s"; filename="%s"' % (field_name, file_name),
"Content-Type: " + get_content_type(file_name),
return ("--{boundary}".format(boundary=boundary),
'Content-Disposition: form-data; name="{field_name}"; filename="{file_name}"'.format(
field_name=field_name, file_name=file_name),
"Content-Type: {content_type}".format(content_type=get_content_type(file_name)),
"", file_handle.read())

lines = []
Expand All @@ -212,10 +216,10 @@ def encode_file(field_name):
for field in files:
lines.extend(encode_file(field))

lines.extend(("--%s--" % (boundary), ""))
lines.extend(("--{boundary}--".format(boundary=boundary), ""))
body = "\r\n".join(lines)

headers = {"Content-Type": "multipart/form-data; boundary=" + boundary,
headers = {"Content-Type": "multipart/form-data; boundary={boundary}".format(boundary=boundary),
"Content-Length": str(len(body))}

return body, headers
Expand Down Expand Up @@ -243,12 +247,12 @@ def prepare_request(self, method, path, params, include_secret=False):
def make_request(self, url, method="GET", body=None, headers=None):
headers = headers or {}
if 'User-Agent' not in headers:
headers.update({"User-Agent": "%s Python Client" % self.api.api_name})
headers.update({"User-Agent": "{api_name} Python Client".format(api_name=self.api.api_name)})

# https://github.com/jcgregorio/httplib2/issues/173
# bug in httplib2 w/ Python 3 and disable_ssl_certificate_validation=True
if six.PY3:
http_obj = Http(timeout=self.api.timeout)
http_obj = Http(timeout=self.api.timeout)
else:
http_obj = Http(timeout=self.api.timeout, disable_ssl_certificate_validation=True)

Expand Down
10 changes: 10 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ def test_xauth_exchange(self):
assert access_token


class OAuth2RequestTests(unittest.TestCase):
def setUp(self):
super(OAuth2RequestTests, self).setUp()
self.api = TestInstagramAPI(access_token=access_token)
self.request = oauth2.OAuth2Request(self.api)

def test_generate_sig(self):
self.request._generate_sig(endpoint='/', params=dict(count=1), secret=client_secret)


class InstagramAPITests(unittest.TestCase):
def setUp(self):
super(InstagramAPITests, self).setUp()
Expand Down

0 comments on commit c4ca57e

Please sign in to comment.