|
1 | 1 | import uuid
|
2 |
| -from os import path |
| 2 | +from os import path, urandom |
| 3 | +import hashlib |
| 4 | +import base64 |
| 5 | +import secrets |
3 | 6 |
|
4 | 7 |
|
5 | 8 | import requests
|
6 |
| -from flask import current_app as app, url_for, redirect, render_template, request, session |
| 9 | +from flask import current_app as app, url_for, redirect, render_template, request, session, redirect |
7 | 10 | from flask_oauthlib.client import OAuth
|
| 11 | +from requests_oauthlib import OAuth2Session |
8 | 12 | from docusign_esign import ApiClient
|
9 | 13 | from docusign_esign.client.api_exception import ApiException
|
10 | 14 |
|
@@ -48,7 +52,10 @@ class DSClient:
|
48 | 52 | @classmethod
|
49 | 53 | def _init(cls, auth_type, api):
|
50 | 54 | if auth_type == "code_grant":
|
51 |
| - cls._auth_code_grant(api) |
| 55 | + if session.get("pkce_failed", False): |
| 56 | + cls._auth_code_grant(api) |
| 57 | + else: |
| 58 | + cls._pkce_auth(api) |
52 | 59 | elif auth_type == "jwt":
|
53 | 60 | cls._jwt_auth(api)
|
54 | 61 |
|
@@ -92,6 +99,29 @@ def _auth_code_grant(cls, api):
|
92 | 99 | access_token_method="POST"
|
93 | 100 | )
|
94 | 101 |
|
| 102 | + @classmethod |
| 103 | + def _pkce_auth(cls, api): |
| 104 | + """Authorize with the Authorization Code Grant - OAuth 2.0 flow""" |
| 105 | + use_scopes = [] |
| 106 | + |
| 107 | + if api == "Rooms": |
| 108 | + use_scopes.extend(ROOMS_SCOPES) |
| 109 | + elif api == "Click": |
| 110 | + use_scopes.extend(CLICK_SCOPES) |
| 111 | + elif api == "Admin": |
| 112 | + use_scopes.extend(ADMIN_SCOPES) |
| 113 | + elif api == "Maestro": |
| 114 | + use_scopes.extend(MAESTRO_SCOPES) |
| 115 | + elif api == "WebForms": |
| 116 | + use_scopes.extend(WEBFORMS_SCOPES) |
| 117 | + else: |
| 118 | + use_scopes.extend(SCOPES) |
| 119 | + # remove duplicate scopes |
| 120 | + use_scopes = list(set(use_scopes)) |
| 121 | + |
| 122 | + redirect_uri = DS_CONFIG["app_url"] + url_for("ds.ds_callback") |
| 123 | + cls.ds_app = OAuth2Session(DS_CONFIG["ds_client_id"], redirect_uri=redirect_uri, scope=use_scopes) |
| 124 | + |
95 | 125 | @classmethod
|
96 | 126 | def _jwt_auth(cls, api):
|
97 | 127 | """JSON Web Token authorization"""
|
@@ -152,15 +182,24 @@ def destroy(cls):
|
152 | 182 | def login(cls, auth_type, api):
|
153 | 183 | cls._init(auth_type, api)
|
154 | 184 | if auth_type == "code_grant":
|
155 |
| - return cls.get(auth_type, api).authorize(callback=url_for("ds.ds_callback", _external=True)) |
| 185 | + if session.get("pkce_failed", False): |
| 186 | + return cls.get(auth_type, api).authorize(callback=url_for("ds.ds_callback", _external=True)) |
| 187 | + else: |
| 188 | + code_verifier = cls.generate_code_verifier() |
| 189 | + code_challenge = cls.generate_code_challenge(code_verifier) |
| 190 | + session["code_verifier"] = code_verifier |
| 191 | + return redirect(cls.get_auth_url_with_pkce(code_challenge)) |
156 | 192 | elif auth_type == "jwt":
|
157 | 193 | return cls._jwt_auth(api)
|
158 | 194 |
|
159 | 195 | @classmethod
|
160 | 196 | def get_token(cls, auth_type):
|
161 | 197 | resp = None
|
162 | 198 | if auth_type == "code_grant":
|
163 |
| - resp = cls.get(auth_type).authorized_response() |
| 199 | + if session.get("pkce_failed", False): |
| 200 | + resp = cls.get(auth_type).authorized_response() |
| 201 | + else: |
| 202 | + return cls.fetch_token_with_pkce(request.url) |
164 | 203 | elif auth_type == "jwt":
|
165 | 204 | resp = cls.get(auth_type).to_dict()
|
166 | 205 |
|
@@ -189,3 +228,44 @@ def get(cls, auth_type, api=API_TYPE["ESIGNATURE"]):
|
189 | 228 | if not cls.ds_app:
|
190 | 229 | cls._init(auth_type, api)
|
191 | 230 | return cls.ds_app
|
| 231 | + |
| 232 | + @classmethod |
| 233 | + def generate_code_verifier(cls): |
| 234 | + # Generate a random 32-byte string and base64-url encode it |
| 235 | + return secrets.token_urlsafe(32) |
| 236 | + |
| 237 | + @classmethod |
| 238 | + def generate_code_challenge(cls, code_verifier): |
| 239 | + # Hash the code verifier using SHA-256 |
| 240 | + sha256_hash = hashlib.sha256(code_verifier.encode()).digest() |
| 241 | + |
| 242 | + # Base64 encode the hash and make it URL safe |
| 243 | + base64_encoded = base64.urlsafe_b64encode(sha256_hash).decode().rstrip('=') |
| 244 | + |
| 245 | + return base64_encoded |
| 246 | + |
| 247 | + @classmethod |
| 248 | + def get_auth_url_with_pkce(cls, code_challenge): |
| 249 | + authorize_url = DS_CONFIG["authorization_server"] + "/oauth/auth" |
| 250 | + auth_url, state = cls.ds_app.authorization_url( |
| 251 | + authorize_url, |
| 252 | + code_challenge=code_challenge, |
| 253 | + code_challenge_method='S256', # PKCE uses SHA-256 hashing, |
| 254 | + approval_prompt="auto" |
| 255 | + ) |
| 256 | + |
| 257 | + return auth_url |
| 258 | + |
| 259 | + @classmethod |
| 260 | + def fetch_token_with_pkce(cls, authorization_response): |
| 261 | + access_token_url = DS_CONFIG["authorization_server"] + "/oauth/token" |
| 262 | + token = cls.get("code_grant", session.get("api")).fetch_token( |
| 263 | + access_token_url, |
| 264 | + authorization_response=authorization_response, |
| 265 | + client_id=DS_CONFIG["ds_client_id"], |
| 266 | + client_secret=DS_CONFIG["ds_client_secret"], |
| 267 | + code_verifier=session["code_verifier"], |
| 268 | + code_challenge_method="S256" |
| 269 | + ) |
| 270 | + |
| 271 | + return token |
0 commit comments