diff --git a/README.md b/README.md index a36650f..d2cabf1 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ pip install pyxero ### Using OAuth2 Credentials OAuth2 is an open standard authorization protocol that allows users to -provide specific permissions to apps that want to use their account. OAuth2 +provide specific permissions to apps that want to use their account. OAuth2 authentication is performed using *tokens* that are obtained using an API; these tokens are then provided with each subsequent request. @@ -186,12 +186,152 @@ def some_view_which_calls_xero(request): ... ``` +### Using PKCE Credentials + +PKCE is an alternative flow for providing authentication via OAuth2. It works +largely the same as the standard OAuth2 mechanism, but unlike the normal flow is +designed to work with applications which cannot keep private keys secure, such +as desktop, mobile or single page apps where such secrets could be extracted. A +client ID is still required. + +As elsewhere, OAuth2 tokens have a 30 minute expiry, but can be only swapped for +a new token if the `offline_access` scope is requested. + +Xero documentation on the PKCE flow can be found +[here](https://developer.xero.com/documentation/guides/oauth2/pkce-flow). The +procedure for creating and authenticating credentials is as follows *(with a CLI +example at the end)*: + + 1) [Register your app](https://developer.xero.com/myapps) with Xero, using a + redirect URI which will be served by your app in order to complete the + authorisation e.g. `http://localhost:/callback/`. You can chose any + port, anc can pass it to the credentials object on construction, allow with + the the Client Id you are provded with. + + 2) Construct an `OAuth2Credentials` instance using the details from the first + step. + + ```python + >>> from xero.auth import OAuth2Credentials + >>> + >>> credentials = OAuth2PKCECredentials(client_id, port=my_port) + ``` + + If neccessary, pass in a list of scopes to define the scopes required by + your app. E.g. if write access is required to transactions and payroll + employees: + + ```python + >>> from xero.constants import XeroScopes + >>> + >>> my_scope = [XeroScopes.ACCOUNTING_TRANSACTIONS, + >>> XeroScopes.PAYROLL_EMPLOYEES] + >>> credentials = OAuth2Credentials(client_id, scope=my_scope + >>> port=my_port) + ``` + + The default scopes are `['offline_access', 'accounting.transactions.read', + 'accounting.contacts.read']`. `offline_access` is required in order for + tokens to be refreshable. For more details on scopes see [Xero's + documentation on oAuth2 + scopes](https://developer.xero.com/documentation/oauth2/scopes). + + 3) Call `credentials.logon()` . This will open a browser window, an visit + a Xero authentication page. + + ```python + >>> credentials.logon() + ``` + + The Authenticator will also start a local webserver on the provided port. + This webserver will be used to collect the tokens that Xero returns. + + The default `PCKEAuthReceiver` class has no reponse pages defined so the + browser will show an error, on empty page for all transactions. But the + application is now authorised and will continue. If you wish you can + override the `send_access_ok()` method, and the `send_error_page()` method + to create a more userfriendly experience. + + In either case once the callback url has been visited the local server will + shutdown. + + 4) You can now continue as per the normal OAuth2 flow. Now the credentials may + be used to authorize a Xero session. As OAuth2 allows authentication for + multiple Xero Organisations, it is necessary to set the tenant_id against + which the xero client's queries will run. + + ```python + >>> from xero import Xero + >>> # Use the first xero organisation (tenant) permitted + >>> credentials.set_default_tenant() + >>> xero = Xero(credentials) + >>> xero.contacts.all() + >>> ... + ``` + If the scopes supplied in Step 2 did not require access to organisations + (e.g. when only requesting scopes for single sign) it will not be possible + to make requests with the Xero API and `set_default_tenant()` will raise an + exception. + + To pick from multiple possible Xero organisations the `tenant_id` may be set + explicitly: + + ```python + >>> tenants = credentials.get_tenants() + >>> credentials.tenant_id = tenants[1]['tenantId'] + >>> xero = Xero(credentials) + ``` + `OAuth2Credentials.__init__()` accepts `tenant_id` as a keyword argument. + + 5) When using the API over an extended period, you will need to exchange tokens + when they expire. If a refresh token is available, it can be used to + generate a new token: + + ```python + >>> if credentials.expired(): + >>> credentials.refresh() + >>> # Then store the new credentials or token somewhere for future use: + >>> cred_state = credentials.state + >>> # or + >>> new_token = credentials.token + + **Important**: ``credentials.state`` changes after a token swap. Be sure to + persist the new state. + + ``` + +#### CLI OAuth2 App Example + +This example shows authorisation, automatic token refreshing and API use in +a Django app which has read/write access to contacts and transactions. + +Each time this app starts it asks for authentication, but you +could consider using the user `keyring` to store tokens. + +```python +from xero import Xero +from xero.auth import OAuth2PKCECredentials +from xero.constants import XeroScopes + +# Get client_id, client_secret from config file or settings then +credentials = OAuth2PKCECredentials( + client_id, port=8080, + scope=[XeroScopes.OFFLINE_ACCESS, XeroScopes.ACCOUNTING_CONTACTS, + XeroScopes.ACCOUNTING_TRANSACTIONS] +) +credentials.logon() +credentials.set_default_tenant() + +for contacts in xero.contacts.all() + print contact["Name"] +``` + ### Older authentication methods ### -In the past, Xero had the concept of "Public", "Private", and "Partner" +In the past, Xero had the concept of "Public", "Private", and "Partner" applications, which each had their own authentication procedures. However, -they removed access for Public applications on 31 March 2021; Private -applications were removed on 30 September 2021. Partner applications +they removed access for Public applications on 31 March 2021; Private +applications were removed on 30 September 2021. Partner applications still exist, but the only supported authentication method is OAuth2; these are now referred to as "OAuth2 apps". As Xero no longer supports these older authentication methods, neither does PyXero. @@ -446,4 +586,3 @@ New features or bug fixes can be submitted via a pull request. If you want your pull request to be merged quickly, make sure you either include regression test(s) for the behavior you are adding/fixing, or provide a good explanation of why a regression test isn't possible. - diff --git a/src/xero/auth.py b/src/xero/auth.py index 306e982..30e0f7f 100644 --- a/src/xero/auth.py +++ b/src/xero/auth.py @@ -1,8 +1,15 @@ from __future__ import unicode_literals +import base64 import datetime +import hashlib +import http.server import requests -from six.moves.urllib.parse import parse_qs, urlencode +import secrets +import threading +import webbrowser +from functools import partial +from six.moves.urllib.parse import parse_qs, urlencode, urlparse from oauthlib.oauth1 import SIGNATURE_HMAC, SIGNATURE_RSA, SIGNATURE_TYPE_AUTH_HEADER from requests_oauthlib import OAuth1, OAuth2, OAuth2Session @@ -708,3 +715,192 @@ def _handle_error_response(response): raise XeroNotAvailable(response) else: raise XeroExceptionUnknown(response) + + +class PKCEAuthReceiver(http.server.BaseHTTPRequestHandler): + """ This is an http request processsor for server running on localhost, + used by the PKCE auth system. + Xero will redirect the browser after auth, from which we + can collect the toke Xero provides. + + You can subclass this and override the `send_error_page` and + `send_access_ok` methods to customise the sucess and failure + pages displayed in the browser. + """ + def __init__(self, credmanager, *args, **kwargs): + self.credmanager = credmanager + super().__init__(*args, **kwargs) + + @staticmethod + def close_server(s): + s.shutdown() + + def do_GET(self, *args): + request = urlparse(self.path) + params = parse_qs(request.query) + + if request.path == "/callback": + self.credmanager.verify_url(params, self) + else: + self.send_error_page("Unknown endpoint") + + def send_error_page(self, error): + """Display an Error page. + Override this for a custom page. + """ + print("Error:", error) + + def send_access_ok(self): + """Display a success page" + Override this to provide a custom page. + """ + print("LOGIN SUCCESS") + self.shutdown() + + def shutdown(self): + """Start shutdowning our server and return immediately""" + # Launch a thread to close our socket cleanly. + threading.Thread(target=self.__class__.close_server, args=(self.server,)).start() + + +class OAuth2PKCECredentials(OAuth2Credentials): + """An object wrapping the PKCE credential flow for Xero access. + + Usage: + 1) Construct an `OAuth2Credentials` instance: + + >>> from xero.auth import OAuth2PKCECredentials + >>> credentials = OAuth2Credentials(client_id,None, port=8080, + scope=scope) + + A webserver will be setup to listen on the provded port + number which is used for the Auth callback. + + 2) Send the login request. + >>> credentials.logon() + + This will open a browser window which will naviage to a Xero + login page. The Use should grant your application access (or not), + and will be redirected to a locally running webserver to capture + the auth tokens. + + + 3) Use the credentials. It is usually necessary to set the tenant_id (Xero + organisation id) to specify the organisation against which the queries should + run: + >>> from xero import Xero + >>> credentials.set_default_tenant() + >>> xero = Xero(credentials) + >>> xero.contacts.all() + ... + + To use a different organisation, set credentials.tenant_id: + >>> tenants = credentials.get_tenants() + >>> credentials.tenant_id = tenants[1]['tenantId'] + + 4) If a refresh token is available, it can be used to generate a new token: + >>> if credentials.expired(): + >>> credentials.refresh() + + Note that in order for tokens to be refreshable, Xero API requires + `offline_access` to be included in the scope. + + :param port: the port the local webserver will listen ro + :param verifier: (optional) a string verifier token if not + provided the module will generate it's own + :param request_handler: An HTTP request handler class. This will + be used to handler the callback request. If + you wish to customise your error page, this is + where you pass in you custom class. + :param callback_uri: Allow customisation of the callback uri. Only + useful if you've customised the request_handler + to match. + + :param scope: Inhereited from Oath2Credentials. + """ + def __init__(self, *args, **kwargs): + self.port = kwargs.pop('port', 8080) + # Xero requires between 43 adn 128 bytes, it fails + # with invlaid grant if this is not long enough + self.verifier = kwargs.pop('verifier', secrets.token_urlsafe(64)) + self.handler_kls = kwargs.pop('request_handler', PKCEAuthReceiver) + self.error = None + if isinstance(self.verifier, str): + self.verifier = self.verifier.encode('ascii') + + kwargs.setdefault('callback_uri', f"http://localhost:{self.port}/callback") + super().__init__(*args, **kwargs) + + def logon(self): + """Launch PKCE auth process and wait for completion""" + challenge = str( + base64.urlsafe_b64encode( + hashlib.sha256( + self.verifier + ).digest() + )[:-1], 'ascii' + ) + url_base = super().generate_url() + webbrowser.open( + f"{url_base}&code_challenge={challenge}&" + "code_challenge_method=S256" + ) + self.wait_for_callback() + + def wait_for_callback(self): + listen_to = ('', self.port) + s = http.server.HTTPServer( + listen_to, + partial(self.handler_kls, self) + ) + s.serve_forever() + if self.error: + raise self.error + + def verify_url(self, params, reqhandler): + """Used to verify the parameters xero returns in the + redirect callback""" + error = params.get('error', None) + if error: + self.handle_error(error, reqhandler) + return + + if params['state'][0] != self.state['auth_state']: + self.handle_error("State Mismatch", reqhandler) + return + + code = params.get('code', None) + if code: + try: + self.get_token(code[0]) + except Exception as e: + self.error = e + reqhandler.send_error_page(str(e)) + reqhandler.shutdown() + + reqhandler.send_access_ok() + + def get_token(self, code): + # Does the third leg, to get the actual auth token from Xero, + # once the authentication has been 'approved' by the user + resp = requests.post( + url=XERO_OAUTH2_TOKEN_URL, + data={ + 'grant_type': 'authorization_code', + 'client_id': self.client_id, + 'redirect_uri': self.callback_uri, + 'code': code, + 'code_verifier': self.verifier + } + ) + respdata = resp.json() + error = respdata.get('error', None) + if error: + raise XeroAccessDenied(error) + + self._init_oauth(respdata) + + def handle_error(self, msg, handler): + self.error = RuntimeError(msg) + handler.send_error_page(msg) + handler.shutdown() diff --git a/tests/test_auth.py b/tests/test_auth.py index afe45ec..c6c56e4 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,4 +1,5 @@ import json +import requests import time import unittest from datetime import datetime, timedelta @@ -8,7 +9,9 @@ from xero.api import Xero from xero.auth import ( OAuth2Credentials, + OAuth2PKCECredentials, PartnerCredentials, + PKCEAuthReceiver, PrivateCredentials, PublicCredentials, ) @@ -25,7 +28,7 @@ class PublicCredentialsTest(unittest.TestCase): @patch("requests.post") def test_initial_constructor(self, r_post): - "Initial construction causes a requst to get a request token" + "Initial construction causes a request to get a request token" r_post.return_value = Mock( status_code=200, text="oauth_token=token&oauth_token_secret=token_secret" ) @@ -570,3 +573,174 @@ def test_set_default_tenant_raises_exception(self): ) with self.assertRaises(XeroException): credentials.set_default_tenant() + + +class PKCERequestHandlerDummy: + def __init__(self,): + self.error = None + self.is_shutdown = False + self.success = False + + def send_error_page(self, error): + self.error = error + + def send_access_ok(self, ): + self.success = True + + def shutdown(self,): + self.is_shutdown = True + + +class PKCECredentialsTest(unittest.TestCase): + # Mostly the same in principle as the Oauth2 ones, + # but just include tests where behavior is different. + callback_uri = "http://localhost:8123/callback" + + def setUp(self): + super().setUp() + # Create an expired token to be used by tests + self.expired_token = { + "access_token": "1234567890", + "expires_in": 1800, + "token_type": "Bearer", + "refresh_token": "0987654321", + # 'expires_at': datetime.utcnow().timestamp()} + "expires_at": time.time(), + } + + def test_verification_using_bad_auth_uri(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state" + ) + bad_auth_params = {'error': 'access_denied', 'state': credentials.auth_state} + + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(bad_auth_params, rhandler) + self.assertEqual(rhandler.error, 'access_denied') + self.assertTrue(rhandler.is_shutdown, ) + + def test_verification_using_bad_state(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state" + ) + bad_auth_params = {'code': '213456789', 'state': "wrong_state"} + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(bad_auth_params, rhandler) + self.assertEqual(rhandler.error, 'State Mismatch') + self.assertTrue(rhandler.is_shutdown, ) + + @patch("requests_oauthlib.OAuth2Session.request") + def test_verification_success(self, r_request): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state", + port=8123, + ) + params = { + 'code': ['0123456789'], + 'scope': credentials.scope, + 'state': [credentials.auth_state] + } + now = datetime.now().timestamp() + with patch.object(requests, 'post', return_value=Mock( + status_code=200, + request=Mock(headers={}, body=""), + headers={}, + json=lambda: { + "access_token": "1234567890", + "expires_at": now + 1800, + "token_type": "Bearer", + "refresh_token": "0987654321"} + )) as r_request: + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(params, rhandler) + self.assertIs(rhandler.error, None) + self.assertTrue(r_request.called) + self.assertTrue(credentials.token) + self.assertTrue(credentials.oauth) + self.assertFalse(credentials.expired()) + + # Finally test the state + self.assertEqual( + credentials.state, + { + "client_id": credentials.client_id, + "client_secret": credentials.client_secret, + "auth_state": credentials.auth_state, + "scope": credentials.scope, + "user_agent": credentials.user_agent, + "token": credentials.token, + "callback_uri": self.callback_uri + }, + ) + + def test_verification_failure(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state" + ) + params = { + 'code': ['0123456789'], + 'scope': credentials.scope, + 'state': [credentials.auth_state] + } + with patch.object(requests, 'post', return_value=Mock( + status_code=400, + request=Mock(headers={}, body=""), + headers={}, + json=lambda: {"error": "invalid_grant"} + )): + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(params, rhandler) + + self.assertIsInstance(credentials.error, XeroAccessDenied) + + def test_logon_opens_a_webbrowser(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state", + request_handler=PKCERequestHandlerDummy, + port=8123 + ) + server = Mock() + with patch('http.server.HTTPServer', return_value=server) as hs, \ + patch('webbrowser.open') as wb: + credentials.logon() + + self.assertTrue(wb.called) + server.serve_forerver.has_been_called() + portdata = hs.call_args[0][0] + partial = hs.call_args[0][1] + self.assertEqual(portdata, ('', 8123)) + self.assertEqual(partial.func, PKCERequestHandlerDummy) + self.assertEqual(partial.args, (credentials,)) + + +class PKCECallbackHandlerTests(unittest.TestCase): + def setUp(self,): + # PKCEAuthReciever's base class has a non-trivial + # handle method which is called by it's constructor. + # For tests we need to bypass this, so we can probe + # the do_GET behaviour - hence this subclass. + class TestRx(PKCEAuthReceiver): + def handle(self, *args): + pass + + self.pkce_manager = Mock() + self.handler = TestRx(self.pkce_manager, + request=Mock(), + client_address=None, + server=Mock()) + + def test_going_somewhere_other_than_callback_errors(self,): + self.handler.path = "/foo/bar/baz.html" + self.handler.send_error_page = Mock() + self.handler.do_GET() + self.handler.send_error_page.assert_called_with("Unknown endpoint") + + def test_revieving_callback_decodes_parms_and_sends_to_verifyurl(self,): + self.handler.path = "/callback?value=123&something=foo&different=bar" + self.handler.send_error_page = Mock() + self.handler.do_GET() + self.pkce_manager.verify_url.assert_called_with({ + 'value': ['123'], + 'something': ['foo'], + 'different': ['bar'], + }, self.handler)