Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dump_auth_config management cmd (for SAML and LDAP) #14947

Merged
merged 10 commits into from
Mar 15, 2024
150 changes: 150 additions & 0 deletions awx/main/management/commands/dump_auth_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import json
import os
import sys
import re

from typing import Any
from django.core.management.base import BaseCommand
from django.conf import settings
from awx.conf import settings_registry


class Command(BaseCommand):
help = 'Dump the current auth configuration in django_ansible_base.authenticator format, currently support LDAP'
jessicamack marked this conversation as resolved.
Show resolved Hide resolved

DAB_SAML_AUTHENTICATOR_KEYS = {
"SP_ENTITY_ID": True,
"SP_PUBLIC_CERT": True,
"SP_PRIVATE_KEY": True,
"ORG_INFO": True,
"TECHNICAL_CONTACT": True,
"SUPPORT_CONTACT": True,
"SP_EXTRA": False,
"SECURITY_CONFIG": False,
"EXTRA_DATA": False,
"ENABLED_IDPS": True,
"CALLBACK_URL": False,
}

DAB_LDAP_AUTHENTICATOR_KEYS = {
"SERVER_URI": True,
"BIND_DN": False,
"BIND_PASSWORD": False,
"CONNECTION_OPTIONS": False,
"GROUP_TYPE": True,
"GROUP_TYPE_PARAMS": True,
"GROUP_SEARCH": False,
"START_TLS": False,
"USER_DN_TEMPLATE": True,
"USER_ATTR_MAP": True,
"USER_SEARCH": False,
}

def get_awx_ldap_settings(self) -> dict[str, dict[str, Any]]:
awx_ldap_settings = {}

for awx_ldap_setting in settings_registry.get_registered_settings(category_slug='ldap'):
key = awx_ldap_setting.removeprefix("AUTH_LDAP_")
value = getattr(settings, awx_ldap_setting, None)
awx_ldap_settings[key] = value

grouped_settings = {}

for key, value in awx_ldap_settings.items():
match = re.search(r'(\d+)', key)
index = int(match.group()) if match else 0
new_key = re.sub(r'\d+_', '', key)

if index not in grouped_settings:
grouped_settings[index] = {}

grouped_settings[index][new_key] = value
if new_key == "GROUP_TYPE" and value:
grouped_settings[index][new_key] = type(value).__name__

return grouped_settings

def is_enabled(self, settings, keys):
for key, required in keys.items():
if required and not settings.get(key):
return False
return True

def get_awx_saml_settings(self) -> dict[str, Any]:
awx_saml_settings = {}
for awx_saml_setting in settings_registry.get_registered_settings(category_slug='saml'):
awx_saml_settings[awx_saml_setting.removeprefix("SOCIAL_AUTH_SAML_")] = getattr(settings, awx_saml_setting, None)

return awx_saml_settings

def format_config_data(self, enabled, awx_settings, type, keys):
config = {
"type": f"awx.authentication.authenticator_plugins.{type}",
"enabled": enabled,
"configuration": {},
}
for k in keys:
v = awx_settings.get(k)
config["configuration"].update({k: v})

return config

def add_arguments(self, parser):
parser.add_argument(
"output_file",
nargs="?",
type=str,
default=None,
help="Output JSON file path",
)

def handle(self, *args, **options):
try:
data = []

# dump SAML settings
awx_saml_settings = self.get_awx_saml_settings()
awx_saml_enabled = self.is_enabled(awx_saml_settings, self.DAB_SAML_AUTHENTICATOR_KEYS)
if awx_saml_enabled:
data.append(
self.format_config_data(
awx_saml_enabled,
awx_saml_settings,
"saml",
self.DAB_SAML_AUTHENTICATOR_KEYS,
)
)

# dump LDAP settings
awx_ldap_group_settings = self.get_awx_ldap_settings()
for awx_ldap_settings in awx_ldap_group_settings.values():
enabled = self.is_enabled(awx_ldap_settings, self.DAB_LDAP_AUTHENTICATOR_KEYS)
if enabled:
data.append(
self.format_config_data(
enabled,
awx_ldap_settings,
"ldap",
self.DAB_LDAP_AUTHENTICATOR_KEYS,
)
)

# write to file if requested
if options["output_file"]:
# Define the path for the output JSON file
output_file = options["output_file"]

# Ensure the directory exists
os.makedirs(os.path.dirname(output_file), exist_ok=True)

# Write data to the JSON file
with open(output_file, "w") as f:
json.dump(data, f, indent=4)

self.stdout.write(self.style.SUCCESS(f"Auth config data dumped to {output_file}"))
else:
self.stdout.write(json.dumps(data, indent=4))

except Exception as e:
self.stdout.write(self.style.ERROR(f"An error occurred: {str(e)}"))
sys.exit(1)
80 changes: 80 additions & 0 deletions awx/main/tests/unit/commands/test_dump_auth_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from io import StringIO
import json
from django.core.management import call_command
from django.test import TestCase, override_settings


# SAML
@override_settings(SOCIAL_AUTH_SAML_SP_ENTITY_ID="SP_ENTITY_ID")
jessicamack marked this conversation as resolved.
Show resolved Hide resolved
@override_settings(SOCIAL_AUTH_SAML_SP_PUBLIC_CERT="SP_PUBLIC_CERT")
@override_settings(SOCIAL_AUTH_SAML_SP_PRIVATE_KEY="SP_PRIVATE_KEY")
@override_settings(SOCIAL_AUTH_SAML_ORG_INFO="ORG_INFO")
@override_settings(SOCIAL_AUTH_SAML_TECHNICAL_CONTACT="TECHNICAL_CONTACT")
@override_settings(SOCIAL_AUTH_SAML_SUPPORT_CONTACT="SUPPORT_CONTACT")
@override_settings(SOCIAL_AUTH_SAML_SP_EXTRA="SP_EXTRA")
@override_settings(SOCIAL_AUTH_SAML_SECURITY_CONFIG="SECURITY_CONFIG")
@override_settings(SOCIAL_AUTH_SAML_EXTRA_DATA="EXTRA_DATA")
@override_settings(SOCIAL_AUTH_SAML_ENABLED_IDPS="ENABLED_IDPS")

# LDAP
@override_settings(AUTH_LDAP_1_SERVER_URI=["SERVER_URI"])
@override_settings(AUTH_LDAP_1_BIND_DN="BIND_DN")
@override_settings(AUTH_LDAP_1_BIND_PASSWORD="BIND_PASSWORD")
@override_settings(AUTH_LDAP_1_GROUP_SEARCH=["GROUP_SEARCH"])
@override_settings(AUTH_LDAP_1_GROUP_TYPE="string object")
jessicamack marked this conversation as resolved.
Show resolved Hide resolved
@override_settings(AUTH_LDAP_1_GROUP_TYPE_PARAMS={"member_attr": "member", "name_attr": "cn"})
@override_settings(AUTH_LDAP_1_USER_DN_TEMPLATE="USER_DN_TEMPLATE")
@override_settings(AUTH_LDAP_1_USER_SEARCH=["USER_SEARCH"])
@override_settings(
AUTH_LDAP_1_USER_ATTR_MAP={
"email": "email",
"last_name": "last_name",
"first_name": "first_name",
}
)
@override_settings(AUTH_LDAP_1_CONNECTION_OPTIONS={})
@override_settings(AUTH_LDAP_1_START_TLS=None)
class TestDumpAuthConfigCommand(TestCase):
def setUp(self):
super().setUp()
self.expected_config = [
{
"type": "awx.authentication.authenticator_plugins.saml",
"enabled": True,
"configuration": {
"SP_ENTITY_ID": "SP_ENTITY_ID",
"SP_PUBLIC_CERT": "SP_PUBLIC_CERT",
"SP_PRIVATE_KEY": "SP_PRIVATE_KEY",
"ORG_INFO": "ORG_INFO",
"TECHNICAL_CONTACT": "TECHNICAL_CONTACT",
"SUPPORT_CONTACT": "SUPPORT_CONTACT",
"SP_EXTRA": "SP_EXTRA",
"SECURITY_CONFIG": "SECURITY_CONFIG",
"EXTRA_DATA": "EXTRA_DATA",
"ENABLED_IDPS": "ENABLED_IDPS",
"CALLBACK_URL": "CALLBACK_URL",
},
},
{
"type": "awx.authentication.authenticator_plugins.ldap",
"enabled": True,
"configuration": {
"SERVER_URI": ["SERVER_URI"],
"BIND_DN": "BIND_DN",
"BIND_PASSWORD": "BIND_PASSWORD",
"GROUP_SEARCH": ["GROUP_SEARCH"],
"GROUP_TYPE": "string object",
"GROUP_TYPE_PARAMS": {"member_attr": "member", "name_attr": "cn"},
"USER_DN_TEMPLATE": "USER_DN_TEMPLATE",
"USER_SEARCH": ["USER_SEARCH"],
"USER_ATTR_MAP": {"email": "email", "last_name": "last_name", "first_name": "first_name"},
"CONNECTION_OPTIONS": {},
"START_TLS": None,
},
},
]

def test_json_returned_from_cmd(self):
fosterseth marked this conversation as resolved.
Show resolved Hide resolved
output = StringIO()
call_command("dump-auth-config", stdout=output)
jessicamack marked this conversation as resolved.
Show resolved Hide resolved
assert output.getvalue().rstrip() == json.dumps(self.expected_config)
Loading