From 45f2f1f60c82febfc2ad09b707326cfa5152f63a Mon Sep 17 00:00:00 2001 From: aayushmehta02 Date: Thu, 2 Oct 2025 11:17:01 +0530 Subject: [PATCH 1/2] new feature- reset mfa implementation --- MFA_WITHDRAW_FEATURE.md | 107 ++++++++++++++++++++++++++++++++++ firebase_admin/_mfa.py | 115 +++++++++++++++++++++++++++++++++++++ firebase_admin/auth.py | 34 +++++++++++ tests/test_mfa_withdraw.py | 61 ++++++++++++++++++++ 4 files changed, 317 insertions(+) create mode 100644 MFA_WITHDRAW_FEATURE.md create mode 100644 firebase_admin/_mfa.py create mode 100644 tests/test_mfa_withdraw.py diff --git a/MFA_WITHDRAW_FEATURE.md b/MFA_WITHDRAW_FEATURE.md new file mode 100644 index 00000000..090382d1 --- /dev/null +++ b/MFA_WITHDRAW_FEATURE.md @@ -0,0 +1,107 @@ +# MFA Withdraw Feature Implementation + +This document describes the implementation of the `withdraw_mfa_enrollment` feature for the Firebase Admin SDK for Python. + +## Overview + +The `withdraw_mfa_enrollment` function allows administrators to programmatically withdraw (reset) a user's enrolled second factor authentication method. This feature was previously available in the Node.js SDK but missing from the Python SDK. + +## Implementation Details + +### Files Modified/Created + +1. **`firebase_admin/_mfa.py`** - New module containing the core MFA functionality +2. **`firebase_admin/auth.py`** - Updated to export the new function and MfaError +3. **`tests/test_mfa_withdraw.py`** - Comprehensive test suite + +### Key Components + +#### Core Function: `withdraw_mfa_enrollment` + +```python +def withdraw_mfa_enrollment( + uid: str, + mfa_enrollment_id: str, + api_key: str, + tenant_id: str | None = None, + app=None +) -> dict: +``` + +**Parameters:** +- `uid`: Firebase Auth UID of the user +- `mfa_enrollment_id`: The MFA enrollment ID to revoke +- `api_key`: Web API key from Firebase project settings +- `tenant_id`: Optional tenant ID for multi-tenancy +- `app`: Optional Firebase app instance + +**Returns:** Dictionary response from the Identity Toolkit API + +**Raises:** +- `MfaError`: If the operation fails +- `ValueError`: For invalid arguments + +#### Implementation Flow + +1. **Create Custom Token**: Uses the Firebase Admin SDK to mint a custom token for the user +2. **Exchange for ID Token**: Calls the Identity Toolkit `signInWithCustomToken` endpoint +3. **Withdraw MFA**: Uses the ID token to call the `mfaEnrollment:withdraw` endpoint + +#### Error Handling + +- Custom `MfaError` exception for MFA-specific failures +- Proper HTTP error handling with detailed error messages +- Input validation for required parameters + +## Usage Example + +```python +import firebase_admin +from firebase_admin import auth, credentials + +# Initialize the SDK +cred = credentials.Certificate("service-account-key.json") +firebase_admin.initialize_app(cred) + +# Withdraw MFA enrollment +try: + result = auth.withdraw_mfa_enrollment( + uid="user123", + mfa_enrollment_id="enrollment456", + api_key="your-web-api-key" + ) + print("MFA withdrawn successfully:", result) +except auth.MfaError as e: + print("MFA operation failed:", e) +``` + +## Testing + +The implementation includes comprehensive tests covering: +- Successful withdrawal scenarios +- Error handling for API failures +- Input validation +- Integration with the auth module + +Run tests with: +```bash +python -m pytest tests/test_mfa_withdraw.py -v +``` + +## API Compatibility + +This implementation follows the same pattern as the Node.js SDK, ensuring consistency across Firebase Admin SDKs. + +## Next Steps + +1. **Integration Testing**: Test with actual Firebase project +2. **Documentation**: Add to official SDK documentation +3. **Code Review**: Submit for Firebase team review +4. **Release**: Include in next SDK version + +## Notes + +- Requires Web API key (different from service account key) +- Uses Identity Toolkit v2 API endpoints +- Supports multi-tenant projects via `tenant_id` parameter +- Follows existing SDK patterns for error handling and app management \ No newline at end of file diff --git a/firebase_admin/_mfa.py b/firebase_admin/_mfa.py new file mode 100644 index 00000000..fb12af8c --- /dev/null +++ b/firebase_admin/_mfa.py @@ -0,0 +1,115 @@ +# Copyright 2025 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Firebase auth MFA management sub module.""" + +import requests +import typing as _t + +from firebase_admin import _auth_client +from firebase_admin import _utils +from firebase_admin import exceptions + +_AUTH_ATTRIBUTE = '_auth' + +class MfaError(exceptions.FirebaseError): + """Represents an error related to MFA operations.""" + def __init__(self, message, cause=None, http_response=None): + exceptions.FirebaseError.__init__(self, 'MFA_ERROR', message, cause, http_response) + +def _to_text(b: _t.Union[str, bytes]) -> str: + return b.decode("utf-8") if isinstance(b, (bytes, bytearray)) else str(b) + + +def _signin_with_custom_token(*, api_key: str, custom_token: str, tenant_id: str | None) -> str: + """ + Exchange a Custom Token for an ID token. + + Uses: POST https://identitytoolkit.googleapis.com/v1/accounts:signInWithCustomToken?key=API_KEY + """ + if not api_key: + raise ValueError("api_key must be provided (Web API key from Firebase project settings).") + + url = f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithCustomToken?key={api_key}" + payload = { + "token": custom_token, + "returnSecureToken": True, + } + if tenant_id: + payload["tenantId"] = tenant_id + + try: + r = requests.post(url, json=payload, timeout=30) + r.raise_for_status() + data = r.json() + if "idToken" not in data: + raise MfaError(f"Failed to exchange custom token for ID token: {data.get('error', 'Unknown error')}", http_response=r) + return data["idToken"] + except requests.exceptions.RequestException as e: + message = f"Failed to exchange custom token for ID token: {e}" + raise MfaError(message, cause=e, http_response=e.response) from e + + +def withdraw_mfa_enrollment( + *, + uid: str, + mfa_enrollment_id: str, + api_key: str, + tenant_id: str | None = None, + app=None, +) -> dict: + """ + Withdraw (reset) a user's enrolled second factor by enrollment ID. + + Args: + uid: Firebase Auth UID of the user to act on. + mfa_enrollment_id: Enrollment ID of the second factor to revoke. + api_key: Web API key (from Firebase console) used by signInWithCustomToken. + tenant_id: Optional Tenant ID if using multi-tenancy. + app: Optional firebase_admin App instance. + + Returns: + dict response from accounts.mfaEnrollment:withdraw (typically contains updated user info). + + Raises: + MfaError on failure. + """ + if not uid: + raise ValueError("uid must be a non-empty string.") + if not mfa_enrollment_id: + raise ValueError("mfa_enrollment_id must be a non-empty string.") + + # 1) Create Custom Token as the user + client = _utils.get_app_service(app, _AUTH_ATTRIBUTE, _auth_client.Client) + custom_token = _to_text(client.create_custom_token(uid)) + + # 2) Exchange Custom Token → ID token (requires API key) + id_token = _signin_with_custom_token(api_key=api_key, custom_token=custom_token, tenant_id=tenant_id) + + # 3) Withdraw MFA with the ID token + withdraw_url = "https://identitytoolkit.googleapis.com/v2/accounts/mfaEnrollment:withdraw" + if api_key: + withdraw_url += f"?key={api_key}" # optional, but fine to append + + payload = {"idToken": id_token, "mfaEnrollmentId": mfa_enrollment_id} + if tenant_id: + payload["tenantId"] = tenant_id + + try: + r = requests.post(withdraw_url, json=payload, timeout=30) + r.raise_for_status() + return r.json() + except requests.exceptions.RequestException as e: + message = f"Failed to withdraw MFA enrollment: {e}" + raise MfaError(message, cause=e, http_response=e.response) from e diff --git a/firebase_admin/auth.py b/firebase_admin/auth.py index cb63ab7f..183490bf 100644 --- a/firebase_admin/auth.py +++ b/firebase_admin/auth.py @@ -26,6 +26,7 @@ from firebase_admin import _token_gen from firebase_admin import _user_import from firebase_admin import _user_mgt +from firebase_admin import _mfa from firebase_admin import _utils @@ -54,6 +55,7 @@ 'InvalidSessionCookieError', 'ListProviderConfigsPage', 'ListUsersPage', + 'MfaError', 'OIDCProviderConfig', 'PhoneNumberAlreadyExistsError', 'ProviderConfig', @@ -108,6 +110,7 @@ 'update_user', 'verify_id_token', 'verify_session_cookie', + 'withdraw_mfa_enrollment', ] ActionCodeSettings = _user_mgt.ActionCodeSettings @@ -131,6 +134,7 @@ InvalidSessionCookieError = _token_gen.InvalidSessionCookieError ListProviderConfigsPage = _auth_providers.ListProviderConfigsPage ListUsersPage = _user_mgt.ListUsersPage +MfaError = _mfa.MfaError OIDCProviderConfig = _auth_providers.OIDCProviderConfig PhoneNumberAlreadyExistsError = _auth_utils.PhoneNumberAlreadyExistsError ProviderConfig = _auth_providers.ProviderConfig @@ -647,6 +651,35 @@ def generate_sign_in_with_email_link(email, action_code_settings, app=None): email, action_code_settings=action_code_settings) +def withdraw_mfa_enrollment(uid: str, mfa_enrollment_id: str, api_key: str, tenant_id: str | None = None, app=None) -> dict: + """Withdraw (reset) a second factor for the given user. + + This performs an admin-initiated reset by minting a Custom Token for the user, + exchanging it for an ID token, and then calling the Identity Toolkit withdraw API. + + Args: + uid: Firebase Auth UID. + mfa_enrollment_id: The MFA enrollment ID to revoke (see accounts.lookup to find it). + api_key: Web API key from your Firebase project settings. + tenant_id: Optional Tenant ID for multi-tenancy. + app: Optional App instance. + + Returns: + dict: Response from the withdraw call. + + Raises: + MfaError: If the operation fails. + ValueError: For invalid arguments. + """ + return _mfa.withdraw_mfa_enrollment( + uid=uid, + mfa_enrollment_id=mfa_enrollment_id, + api_key=api_key, + tenant_id=tenant_id, + app=app, + ) + + def get_oidc_provider_config(provider_id, app=None): """Returns the ``OIDCProviderConfig`` with the given ID. @@ -924,3 +957,4 @@ def list_saml_provider_configs( """ client = _get_client(app) return client.list_saml_provider_configs(page_token, max_results) + diff --git a/tests/test_mfa_withdraw.py b/tests/test_mfa_withdraw.py new file mode 100644 index 00000000..abf5dfb7 --- /dev/null +++ b/tests/test_mfa_withdraw.py @@ -0,0 +1,61 @@ +# tests/test_mfa_withdraw.py +from unittest import mock +import pytest +import firebase_admin +from firebase_admin import auth +from firebase_admin._mfa import withdraw_mfa_enrollment, MfaError +from tests import testutils + +API_KEY = "fake-api-key" +UID = "uid123" +ENROLL_ID = "enroll123" + +@pytest.fixture(scope='module') +def mfa_app(): + app = firebase_admin.initialize_app( + testutils.MockCredential(), name='mfaTest', options={'projectId': 'mock-project-id'}) + yield app + firebase_admin.delete_app(app) + +def _fake_custom_token(uid): + return b"FAKE.CUSTOM.TOKEN" + +@mock.patch("firebase_admin._auth_client.Client.create_custom_token", side_effect=_fake_custom_token) +@mock.patch("firebase_admin._mfa.requests.post") +def test_withdraw_success(mock_post, _, mfa_app): + # 1st call: signInWithCustomToken -> returns idToken + # 2nd call: withdraw -> returns ok + mock_post.side_effect = [ + mock.Mock(status_code=200, json=lambda: {"idToken": "ID.TOKEN"}), + mock.Mock(status_code=200, json=lambda: {"localId": UID}), + ] + res = withdraw_mfa_enrollment(uid=UID, mfa_enrollment_id=ENROLL_ID, api_key=API_KEY, app=mfa_app) + assert res["localId"] == UID + assert mock_post.call_count == 2 + +@mock.patch("firebase_admin._auth_client.Client.create_custom_token", side_effect=_fake_custom_token) +@mock.patch("firebase_admin._mfa.requests.post") +def test_withdraw_signin_fail(mock_post, _, mfa_app): + mock_post.return_value = mock.Mock(status_code=400, json=lambda: {"error": {"message": "INVALID_CUSTOM_TOKEN"}}) + with pytest.raises(MfaError): + withdraw_mfa_enrollment(uid=UID, mfa_enrollment_id=ENROLL_ID, api_key=API_KEY, app=mfa_app) + +@mock.patch("firebase_admin._auth_client.Client.create_custom_token", side_effect=_fake_custom_token) +@mock.patch("firebase_admin._mfa.requests.post") +def test_withdraw_via_auth_module(mock_post, _, mfa_app): + """Test that the function is accessible via the auth module.""" + mock_post.side_effect = [ + mock.Mock(status_code=200, json=lambda: {"idToken": "ID.TOKEN"}), + mock.Mock(status_code=200, json=lambda: {"localId": UID}), + ] + res = auth.withdraw_mfa_enrollment(uid=UID, mfa_enrollment_id=ENROLL_ID, api_key=API_KEY, app=mfa_app) + assert res["localId"] == UID + assert mock_post.call_count == 2 + +def test_invalid_arguments(): + """Test that invalid arguments raise ValueError.""" + with pytest.raises(ValueError, match="uid must be a non-empty string"): + withdraw_mfa_enrollment(uid="", mfa_enrollment_id=ENROLL_ID, api_key=API_KEY) + + with pytest.raises(ValueError, match="mfa_enrollment_id must be a non-empty string"): + withdraw_mfa_enrollment(uid=UID, mfa_enrollment_id="", api_key=API_KEY) \ No newline at end of file From 72399c156bd8cf79a51f0dfc2f6ec6ebfac18f72 Mon Sep 17 00:00:00 2001 From: aayushmehta02 Date: Thu, 2 Oct 2025 18:06:47 +0530 Subject: [PATCH 2/2] fixed lint warnings --- firebase_admin/_mfa.py | 79 ++++++++++++++++++++++++------------------ firebase_admin/auth.py | 5 +-- 2 files changed, 48 insertions(+), 36 deletions(-) diff --git a/firebase_admin/_mfa.py b/firebase_admin/_mfa.py index fb12af8c..2c6a2f7b 100644 --- a/firebase_admin/_mfa.py +++ b/firebase_admin/_mfa.py @@ -14,32 +14,41 @@ """Firebase auth MFA management sub module.""" -import requests import typing as _t - +import requests from firebase_admin import _auth_client from firebase_admin import _utils from firebase_admin import exceptions -_AUTH_ATTRIBUTE = '_auth' +_AUTH_ATTRIBUTE = "_auth" + class MfaError(exceptions.FirebaseError): """Represents an error related to MFA operations.""" + def __init__(self, message, cause=None, http_response=None): - exceptions.FirebaseError.__init__(self, 'MFA_ERROR', message, cause, http_response) + exceptions.FirebaseError.__init__( + self, "MFA_ERROR", message, cause, http_response + ) -def _to_text(b: _t.Union[str, bytes]) -> str: - return b.decode("utf-8") if isinstance(b, (bytes, bytearray)) else str(b) +def _to_text(byte_or_str: _t.Union[str, bytes]) -> str: + if isinstance(byte_or_str, (bytes, bytearray)): + return byte_or_str.decode("utf-8") + return str(byte_or_str) -def _signin_with_custom_token(*, api_key: str, custom_token: str, tenant_id: str | None) -> str: - """ - Exchange a Custom Token for an ID token. + +def _signin_with_custom_token( + *, api_key: str, custom_token: str, tenant_id: str | None +) -> str: + """Exchange a Custom Token for an ID token. Uses: POST https://identitytoolkit.googleapis.com/v1/accounts:signInWithCustomToken?key=API_KEY """ if not api_key: - raise ValueError("api_key must be provided (Web API key from Firebase project settings).") + raise ValueError( + "api_key must be provided (Web API key from Firebase project settings)." + ) url = f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithCustomToken?key={api_key}" payload = { @@ -50,15 +59,15 @@ def _signin_with_custom_token(*, api_key: str, custom_token: str, tenant_id: str payload["tenantId"] = tenant_id try: - r = requests.post(url, json=payload, timeout=30) - r.raise_for_status() - data = r.json() + response = requests.post(url, json=payload, timeout=30) + response.raise_for_status() + data = response.json() if "idToken" not in data: - raise MfaError(f"Failed to exchange custom token for ID token: {data.get('error', 'Unknown error')}", http_response=r) + raise MfaError("Failed to exchange custom token", http_response=response) return data["idToken"] - except requests.exceptions.RequestException as e: - message = f"Failed to exchange custom token for ID token: {e}" - raise MfaError(message, cause=e, http_response=e.response) from e + except requests.exceptions.RequestException as error: + message = f"Failed to exchange custom token for ID token: {error}" + raise MfaError(message, cause=error, http_response=error.response) from error def withdraw_mfa_enrollment( @@ -69,18 +78,17 @@ def withdraw_mfa_enrollment( tenant_id: str | None = None, app=None, ) -> dict: - """ - Withdraw (reset) a user's enrolled second factor by enrollment ID. + """Withdraw (reset) a user's enrolled second factor by enrollment ID. Args: - uid: Firebase Auth UID of the user to act on. + uid: Firebase Auth UID of the user to act on. mfa_enrollment_id: Enrollment ID of the second factor to revoke. - api_key: Web API key (from Firebase console) used by signInWithCustomToken. - tenant_id: Optional Tenant ID if using multi-tenancy. - app: Optional firebase_admin App instance. + api_key: Web API key (from Firebase console) used by signInWithCustomToken. + tenant_id: Optional Tenant ID if using multi-tenancy. + app: Optional firebase_admin App instance. Returns: - dict response from accounts.mfaEnrollment:withdraw (typically contains updated user info). + dict response from accounts.mfaEnrollment:withdraw (contains updated user info). Raises: MfaError on failure. @@ -95,21 +103,24 @@ def withdraw_mfa_enrollment( custom_token = _to_text(client.create_custom_token(uid)) # 2) Exchange Custom Token → ID token (requires API key) - id_token = _signin_with_custom_token(api_key=api_key, custom_token=custom_token, tenant_id=tenant_id) + id_token = _signin_with_custom_token( + api_key=api_key, custom_token=custom_token, tenant_id=tenant_id + ) # 3) Withdraw MFA with the ID token - withdraw_url = "https://identitytoolkit.googleapis.com/v2/accounts/mfaEnrollment:withdraw" - if api_key: - withdraw_url += f"?key={api_key}" # optional, but fine to append + base_url = ( + "https://identitytoolkit.googleapis.com/v2/accounts/mfaEnrollment:withdraw" + ) + withdraw_url = f"{base_url}?key={api_key}" if api_key else base_url payload = {"idToken": id_token, "mfaEnrollmentId": mfa_enrollment_id} if tenant_id: payload["tenantId"] = tenant_id try: - r = requests.post(withdraw_url, json=payload, timeout=30) - r.raise_for_status() - return r.json() - except requests.exceptions.RequestException as e: - message = f"Failed to withdraw MFA enrollment: {e}" - raise MfaError(message, cause=e, http_response=e.response) from e + response = requests.post(withdraw_url, json=payload, timeout=30) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as error: + message = f"Failed to withdraw MFA enrollment: {error}" + raise MfaError(message, cause=error, http_response=error.response) from error diff --git a/firebase_admin/auth.py b/firebase_admin/auth.py index 183490bf..74333ddf 100644 --- a/firebase_admin/auth.py +++ b/firebase_admin/auth.py @@ -651,7 +651,8 @@ def generate_sign_in_with_email_link(email, action_code_settings, app=None): email, action_code_settings=action_code_settings) -def withdraw_mfa_enrollment(uid: str, mfa_enrollment_id: str, api_key: str, tenant_id: str | None = None, app=None) -> dict: +def withdraw_mfa_enrollment(uid: str, mfa_enrollment_id: str, api_key: str, + tenant_id: str | None = None, app=None) -> dict: """Withdraw (reset) a second factor for the given user. This performs an admin-initiated reset by minting a Custom Token for the user, @@ -957,4 +958,4 @@ def list_saml_provider_configs( """ client = _get_client(app) return client.list_saml_provider_configs(page_token, max_results) - + \ No newline at end of file