Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for MFA login flow #757

Merged
merged 9 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hass_nabucasa/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ async def login(self, email: str, password: str) -> None:
"""Log a user in."""
await self.auth.async_login(email, password)

async def login_verify_totp(
self,
email: str,
code: str,
mfa_tokens: dict[str, Any],
) -> None:
"""Verify TOTP code during login."""
await self.auth.async_login_verify_totp(email, code, mfa_tokens)

async def logout(self) -> None:
"""Close connection and remove all credentials."""
self.id_token = None
Expand Down
62 changes: 60 additions & 2 deletions hass_nabucasa/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import botocore
from botocore.exceptions import BotoCoreError, ClientError
import pycognito
from pycognito.exceptions import ForceChangePasswordException
from pycognito.exceptions import ForceChangePasswordException, MFAChallengeException

from .const import MESSAGE_AUTH_FAIL

Expand All @@ -31,6 +31,21 @@ class Unauthenticated(CloudError):
"""Raised when authentication failed."""


class MFARequired(CloudError):
"""Raised when MFA is required."""

session_tokens: dict[str, str]
klejejs marked this conversation as resolved.
Show resolved Hide resolved
klejejs marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, session_tokens: dict[str, str]) -> None:
"""Initialize MFA required error."""
super().__init__("MFA required.")
self.session_tokens = session_tokens
klejejs marked this conversation as resolved.
Show resolved Hide resolved

klejejs marked this conversation as resolved.
Show resolved Hide resolved

class InvalidTotpCode(CloudError):
"""Raised when the TOTP code is invalid."""


class UserNotFound(CloudError):
"""Raised when a user is not found."""

Expand Down Expand Up @@ -58,6 +73,7 @@ class UnknownError(CloudError):


AWS_EXCEPTIONS: dict[str, type[CloudError]] = {
"CodeMismatchException": InvalidTotpCode,
"UserNotFoundException": UserNotFound,
"UserNotConfirmedException": UserNotConfirmed,
"UsernameExistsException": UserExists,
Expand Down Expand Up @@ -169,7 +185,7 @@ async def async_login(self, email: str, password: str) -> None:
async with self._request_lock:
assert not self.cloud.is_logged_in, "Cannot login if already logged in."

cognito = await self.cloud.run_executor(
cognito: pycognito.Cognito = await self.cloud.run_executor(
partial(self._create_cognito_client, username=email),
)

Expand All @@ -187,6 +203,9 @@ async def async_login(self, email: str, password: str) -> None:
if task:
await task

except MFAChallengeException as err:
raise MFARequired(err.get_tokens()) from err

except ForceChangePasswordException as err:
raise PasswordChangeRequired from err

Expand All @@ -196,6 +215,45 @@ async def async_login(self, email: str, password: str) -> None:
except BotoCoreError as err:
raise UnknownError from err

async def async_login_verify_totp(
self,
email: str,
code: str,
mfa_tokens: dict[str, Any],
) -> None:
"""Log user in and fetch certificate if MFA is required."""
try:
async with self._request_lock:
assert not self.cloud.is_logged_in, "Cannot login if already logged in."
klejejs marked this conversation as resolved.
Show resolved Hide resolved

cognito: pycognito.Cognito = await self.cloud.run_executor(
partial(self._create_cognito_client, username=email),
)

async with async_timeout.timeout(30):
await self.cloud.run_executor(
partial(
cognito.respond_to_software_token_mfa_challenge,
code=code,
mfa_tokens=mfa_tokens,
),
)

task = await self.cloud.update_token(
cognito.id_token,
cognito.access_token,
cognito.refresh_token,
)

if task:
await task

except ClientError as err:
raise _map_aws_exception(err) from err

except BotoCoreError as err:
raise UnknownError from err

async def async_check_token(self) -> None:
"""Check that the token is valid and renew if necessary."""
async with self._request_lock:
Expand Down
2 changes: 2 additions & 0 deletions hass_nabucasa/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

REQUEST_TIMEOUT = 10

LOGIN_MFA_CHALLENGE_EXPIRATION = 60
klejejs marked this conversation as resolved.
Show resolved Hide resolved

MODE_PROD = "production"
MODE_DEV = "development"

Expand Down
42 changes: 42 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest.mock import MagicMock, patch

from botocore.exceptions import ClientError
from pycognito.exceptions import MFAChallengeException
import pytest

from hass_nabucasa import auth as auth_api
Expand Down Expand Up @@ -55,6 +56,47 @@ async def test_login_user_not_confirmed(mock_cognito, mock_cloud):
assert len(mock_cloud.update_token.mock_calls) == 0


async def test_login_user_mfa_required(mock_cognito, mock_cloud):
"""Test trying to login without MFA when it is required."""
auth = auth_api.CognitoAuth(mock_cloud)
mock_cognito.authenticate.side_effect = MFAChallengeException("MFA required", {})

with pytest.raises(auth_api.MFARequired):
await auth.async_login("user", "pass")

assert len(mock_cloud.update_token.mock_calls) == 0


async def test_login_user_verify_totp_invalid_code(mock_cognito, mock_cloud):
"""Test trying to login with MFA when it is required."""
auth = auth_api.CognitoAuth(mock_cloud)
mock_cognito.respond_to_software_token_mfa_challenge.side_effect = aws_error(
"CodeMismatchException",
)

with pytest.raises(auth_api.InvalidTotpCode):
await auth.async_login_verify_totp("user", "123456", {"session": "session"})

assert len(mock_cloud.update_token.mock_calls) == 0


async def test_login_user_verify_totp(mock_cognito, mock_cloud):
"""Test trying to login with MFA when it is required."""
auth = auth_api.CognitoAuth(mock_cloud)
mock_cognito.id_token = "test_id_token"
mock_cognito.access_token = "test_access_token"
mock_cognito.refresh_token = "test_refresh_token"

await auth.async_login_verify_totp("user", "123456", {"session": "session"})

assert len(mock_cognito.respond_to_software_token_mfa_challenge.mock_calls) == 1
mock_cloud.update_token.assert_called_once_with(
"test_id_token",
"test_access_token",
"test_refresh_token",
)


async def test_login(mock_cognito, mock_cloud):
"""Test trying to login without confirming account."""
auth = auth_api.CognitoAuth(mock_cloud)
Expand Down
Loading