Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Commit

Permalink
Use browser login instead of device flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Porges committed Nov 10, 2022
1 parent ff85b80 commit 486905f
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions src/cli/onefuzz/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __init__(
self.config = config
self.token_cache: Optional[msal.SerializableTokenCache] = None
self.init_cache()
self.app: Optional[Any] = None
self.app: Optional[msal.ClientApplication] = None
self.token_expires = 0
self.load_config()
self.session = requests.Session()
Expand Down Expand Up @@ -197,7 +197,7 @@ def get_access_token(self) -> Any:
if self.client_secret:
return self.access_token_from_client_secret(scopes)

return self.device_login(scopes)
return self.do_login(scopes)

def access_token_from_client_secret(self, scopes: List[str]) -> Any:
if not self.app:
Expand Down Expand Up @@ -229,14 +229,15 @@ def access_token_from_client_secret(self, scopes: List[str]) -> Any:
)
return result

def device_login(self, scopes: List[str]) -> Any:
def do_login(self, scopes: List[str]) -> Any:
if not self.app:
self.app = msal.PublicClientApplication(
self.config.client_id,
authority=self.config.authority,
token_cache=self.token_cache,
)

access_token = None
for scope in scopes:
accounts = self.app.get_accounts()
if accounts:
Expand All @@ -248,26 +249,33 @@ def device_login(self, scopes: List[str]) -> Any:

for scope in scopes:
LOGGER.info("Attempting interactive device login")
print("Please login", flush=True)

flow = self.app.initiate_device_flow(scopes=[scope])

check_msal_error(flow, ["user_code", "message"])
# setting the expiration time to allow us to retry the interactive login with a new scope
flow["expires_at"] = int(time.time()) + 90 # 90 seconds from now
print(flow["message"], flush=True)

access_token = self.app.acquire_token_by_device_flow(flow)
# AADSTS70016: OAuth 2.0 device flow error. Authorization is pending
# this happens when the intractive login request times out. This heppens when the login
# fails because of a scope mismatch.
if (
"error" in access_token
and "AADSTS70016" in access_token["error_description"]
):
LOGGER.warning(f"failed to get access token with scope {scope}")
continue
check_msal_error(access_token, ["access_token"])
try:
access_token = self.app.acquire_token_interactive(scopes=[scope])
check_msal_error(access_token, ["access_token"])
except KeyboardInterrupt:
result = input("\nInteractive login cancelled. Use device login (Y/n)? ")
if result == "" or result.startswith("y") or result.startswith("Y"):
print("Falling back to device flow, please sign in:", flush=True)
flow = self.app.initiate_device_flow(scopes=[scope])

check_msal_error(flow, ["user_code", "message"])
# setting the expiration time to allow us to retry the interactive login with a new scope
flow["expires_at"] = int(time.time()) + 90 # 90 seconds from now
print(flow["message"], flush=True)

access_token = self.app.acquire_token_by_device_flow(flow)
# AADSTS70016: OAuth 2.0 device flow error. Authorization is pending
# this happens when the intractive login request times out. This heppens when the login
# fails because of a scope mismatch.
if (
"error" in access_token
and "AADSTS70016" in access_token["error_description"]
):
LOGGER.warning(f"failed to get access token with scope {scope}")
continue
check_msal_error(access_token, ["access_token"])
else:
continue

LOGGER.info("Interactive device authentication succeeded")
print("Login succeeded", flush=True)
Expand Down

0 comments on commit 486905f

Please sign in to comment.