Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions src/admin/blueprints/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,23 @@ def google_callback():
tenant_access = get_user_tenant_access(email)

# Build tenant list for selector (empty list is fine - user can create new tenant)
session["available_tenants"] = []
# Use a dict to track tenants by tenant_id to avoid duplicates
tenant_dict = {}

if tenant_access["domain_tenant"]:
session["available_tenants"].append(
{
"tenant_id": tenant_access["domain_tenant"].tenant_id,
"name": tenant_access["domain_tenant"].name,
"subdomain": tenant_access["domain_tenant"].subdomain,
"is_admin": True, # Domain users get admin access
}
)
domain_tenant = tenant_access["domain_tenant"]
tenant_dict[domain_tenant.tenant_id] = {
"tenant_id": domain_tenant.tenant_id,
"name": domain_tenant.name,
"subdomain": domain_tenant.subdomain,
"is_admin": True, # Domain users get admin access
}

for tenant in tenant_access["email_tenants"]:
# Skip if already added via domain access
if tenant.tenant_id in tenant_dict:
continue

# Check existing user record for role, default to admin
with get_db_session() as db_session:
from sqlalchemy import select
Expand All @@ -292,14 +296,15 @@ def google_callback():
existing_user = db_session.scalars(stmt).first()
is_admin = existing_user.role == "admin" if existing_user else True

session["available_tenants"].append(
{
"tenant_id": tenant.tenant_id,
"name": tenant.name,
"subdomain": tenant.subdomain,
"is_admin": is_admin,
}
)
tenant_dict[tenant.tenant_id] = {
"tenant_id": tenant.tenant_id,
"name": tenant.name,
"subdomain": tenant.subdomain,
"is_admin": is_admin,
}

# Convert dict to list for session
session["available_tenants"] = list(tenant_dict.values())

# Always show tenant selector (includes "Create New Tenant" option)
flash(f"Welcome {user.get('name', email)}!", "success")
Expand Down
72 changes: 72 additions & 0 deletions src/admin/tests/unit/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,75 @@ def test_tenant_login_rejects_unauthorized_email(self, mock_get_access, mock_get

# Verify unauthorized users are rejected (no user record creation)
assert True # If this test structure exists, the code path is tested


class TestDuplicateTenantPrevention:
"""Test prevention of duplicate tenant display in selector."""

@patch("src.admin.blueprints.auth.get_db_session")
@patch("src.admin.domain_access.get_user_tenant_access")
def test_tenant_not_duplicated_when_in_both_domain_and_email_lists(self, mock_get_access, mock_get_session):
"""Test that a tenant is not duplicated when user has access via both domain and email."""
# Setup: User has access to same tenant via both domain and email
mock_tenant = Mock()
mock_tenant.tenant_id = "accuweather"
mock_tenant.name = "AccuWeather"
mock_tenant.subdomain = "accuweather"

# Mock database session (no existing user record needed for this test)
mock_session = MagicMock()
mock_get_session.return_value.__enter__.return_value = mock_session
mock_session.scalars.return_value.first.return_value = None

# Mock tenant access - user has access via BOTH domain AND email
tenant_access = {
"domain_tenant": mock_tenant, # Access via domain
"email_tenants": [mock_tenant], # Also in email list (same tenant!)
"is_super_admin": False,
"total_access": 1, # Should be 1, not 2
}
mock_get_access.return_value = tenant_access

# Import the function we're testing
# Create test app with auth blueprint
from flask import Flask

from src.admin.blueprints.auth import auth_bp

app = Flask(__name__)
app.config["SECRET_KEY"] = "test_secret"
app.config["TESTING"] = True
app.register_blueprint(auth_bp)

with app.test_request_context():
# Simulate the logic from google_callback that builds available_tenants
tenant_access = mock_get_access.return_value
tenant_dict = {}

if tenant_access["domain_tenant"]:
domain_tenant = tenant_access["domain_tenant"]
tenant_dict[domain_tenant.tenant_id] = {
"tenant_id": domain_tenant.tenant_id,
"name": domain_tenant.name,
"subdomain": domain_tenant.subdomain,
"is_admin": True,
}

for tenant in tenant_access["email_tenants"]:
# Skip if already added via domain access
if tenant.tenant_id in tenant_dict:
continue

tenant_dict[tenant.tenant_id] = {
"tenant_id": tenant.tenant_id,
"name": tenant.name,
"subdomain": tenant.subdomain,
"is_admin": True,
}

available_tenants = list(tenant_dict.values())

# Verify: Should only have ONE entry, not two
assert len(available_tenants) == 1
assert available_tenants[0]["tenant_id"] == "accuweather"
assert available_tenants[0]["name"] == "AccuWeather"