diff --git a/src/admin/blueprints/auth.py b/src/admin/blueprints/auth.py index f47da1716..ce9a8bfbe 100644 --- a/src/admin/blueprints/auth.py +++ b/src/admin/blueprints/auth.py @@ -269,19 +269,23 @@ def google_callback(): tenant_access = get_user_tenant_access(email) # Build tenant list for selector (empty list is fine - user can create new tenant) - session["available_tenants"] = [] + # Use a dict to track tenants by tenant_id to avoid duplicates + tenant_dict = {} if tenant_access["domain_tenant"]: - session["available_tenants"].append( - { - "tenant_id": tenant_access["domain_tenant"].tenant_id, - "name": tenant_access["domain_tenant"].name, - "subdomain": tenant_access["domain_tenant"].subdomain, - "is_admin": True, # Domain users get admin access - } - ) + domain_tenant = tenant_access["domain_tenant"] + tenant_dict[domain_tenant.tenant_id] = { + "tenant_id": domain_tenant.tenant_id, + "name": domain_tenant.name, + "subdomain": domain_tenant.subdomain, + "is_admin": True, # Domain users get admin access + } for tenant in tenant_access["email_tenants"]: + # Skip if already added via domain access + if tenant.tenant_id in tenant_dict: + continue + # Check existing user record for role, default to admin with get_db_session() as db_session: from sqlalchemy import select @@ -292,14 +296,15 @@ def google_callback(): existing_user = db_session.scalars(stmt).first() is_admin = existing_user.role == "admin" if existing_user else True - session["available_tenants"].append( - { - "tenant_id": tenant.tenant_id, - "name": tenant.name, - "subdomain": tenant.subdomain, - "is_admin": is_admin, - } - ) + tenant_dict[tenant.tenant_id] = { + "tenant_id": tenant.tenant_id, + "name": tenant.name, + "subdomain": tenant.subdomain, + "is_admin": is_admin, + } + + # Convert dict to list for session + session["available_tenants"] = list(tenant_dict.values()) # Always show tenant selector (includes "Create New Tenant" option) flash(f"Welcome {user.get('name', email)}!", "success") diff --git a/src/admin/tests/unit/test_auth.py b/src/admin/tests/unit/test_auth.py index 0ce38a02a..d407a5057 100644 --- a/src/admin/tests/unit/test_auth.py +++ b/src/admin/tests/unit/test_auth.py @@ -290,3 +290,75 @@ def test_tenant_login_rejects_unauthorized_email(self, mock_get_access, mock_get # Verify unauthorized users are rejected (no user record creation) assert True # If this test structure exists, the code path is tested + + +class TestDuplicateTenantPrevention: + """Test prevention of duplicate tenant display in selector.""" + + @patch("src.admin.blueprints.auth.get_db_session") + @patch("src.admin.domain_access.get_user_tenant_access") + def test_tenant_not_duplicated_when_in_both_domain_and_email_lists(self, mock_get_access, mock_get_session): + """Test that a tenant is not duplicated when user has access via both domain and email.""" + # Setup: User has access to same tenant via both domain and email + mock_tenant = Mock() + mock_tenant.tenant_id = "accuweather" + mock_tenant.name = "AccuWeather" + mock_tenant.subdomain = "accuweather" + + # Mock database session (no existing user record needed for this test) + mock_session = MagicMock() + mock_get_session.return_value.__enter__.return_value = mock_session + mock_session.scalars.return_value.first.return_value = None + + # Mock tenant access - user has access via BOTH domain AND email + tenant_access = { + "domain_tenant": mock_tenant, # Access via domain + "email_tenants": [mock_tenant], # Also in email list (same tenant!) + "is_super_admin": False, + "total_access": 1, # Should be 1, not 2 + } + mock_get_access.return_value = tenant_access + + # Import the function we're testing + # Create test app with auth blueprint + from flask import Flask + + from src.admin.blueprints.auth import auth_bp + + app = Flask(__name__) + app.config["SECRET_KEY"] = "test_secret" + app.config["TESTING"] = True + app.register_blueprint(auth_bp) + + with app.test_request_context(): + # Simulate the logic from google_callback that builds available_tenants + tenant_access = mock_get_access.return_value + tenant_dict = {} + + if tenant_access["domain_tenant"]: + domain_tenant = tenant_access["domain_tenant"] + tenant_dict[domain_tenant.tenant_id] = { + "tenant_id": domain_tenant.tenant_id, + "name": domain_tenant.name, + "subdomain": domain_tenant.subdomain, + "is_admin": True, + } + + for tenant in tenant_access["email_tenants"]: + # Skip if already added via domain access + if tenant.tenant_id in tenant_dict: + continue + + tenant_dict[tenant.tenant_id] = { + "tenant_id": tenant.tenant_id, + "name": tenant.name, + "subdomain": tenant.subdomain, + "is_admin": True, + } + + available_tenants = list(tenant_dict.values()) + + # Verify: Should only have ONE entry, not two + assert len(available_tenants) == 1 + assert available_tenants[0]["tenant_id"] == "accuweather" + assert available_tenants[0]["name"] == "AccuWeather"