diff --git a/src/powerpwn/cli/arguments.py b/src/powerpwn/cli/arguments.py index f5e225c..faf86b5 100644 --- a/src/powerpwn/cli/arguments.py +++ b/src/powerpwn/cli/arguments.py @@ -9,67 +9,28 @@ def module_gui(sub_parser: argparse.ArgumentParser): - gui_parser = sub_parser.add_parser( - "gui", - description="Show collected resources and data.", - help="Show collected resources and data via GUI.", - ) - gui_parser.add_argument( - "--cache-path", default=CACHE_PATH, type=str, help="Path to cached resources." - ) - gui_parser.add_argument( - "-t", "--tenant", required=False, type=str, help="Tenant id to launch gui." - ) + gui_parser = sub_parser.add_parser("gui", description="Show collected resources and data.", help="Show collected resources and data via GUI.") + gui_parser.add_argument("--cache-path", default=CACHE_PATH, type=str, help="Path to cached resources.") + gui_parser.add_argument("-t", "--tenant", required=False, type=str, help="Tenant id to launch gui.") def module_dump(sub_parser: argparse.ArgumentParser): dump_parser = sub_parser.add_parser( - "dump", - description="Dump content for all available connection from recon", - help="Dump content for all available connection from recon", - ) - dump_parser.add_argument( - "-c", "--clear-cache", action="store_true", help="Clear local disk cache" - ) - dump_parser.add_argument( - "--cache-path", - default=CACHE_PATH, - help="Path to store collected resources and data.", - ) - dump_parser.add_argument( - "-t", "--tenant", required=False, type=str, help="Tenant id to connect." - ) - dump_parser.add_argument( - "-g", "--gui", action="store_true", help="Run local server for gui." - ) - dump_parser.add_argument( - "-r", - "--recon", - action="store_true", - help="Run recon before dump. Should be used if recon command was not run before.", + "dump", description="Dump content for all available connection from recon", help="Dump content for all available connection from recon" ) + dump_parser.add_argument("-c", "--clear-cache", action="store_true", help="Clear local disk cache") + dump_parser.add_argument("--cache-path", default=CACHE_PATH, help="Path to store collected resources and data.") + dump_parser.add_argument("-t", "--tenant", required=False, type=str, help="Tenant id to connect.") + dump_parser.add_argument("-g", "--gui", action="store_true", help="Run local server for gui.") + dump_parser.add_argument("-r", "--recon", action="store_true", help="Run recon before dump. Should be used if recon command was not run before.") def module_recon(sub_parser: argparse.ArgumentParser): - dump_parser = sub_parser.add_parser( - "recon", - description="Recon for available data connections", - help="Recon for available data connections.", - ) - dump_parser.add_argument( - "-c", "--clear-cache", action="store_true", help="Clear local disk cache" - ) - dump_parser.add_argument( - "--cache-path", - default=CACHE_PATH, - help="Path to store collected resources and data.", - ) - dump_parser.add_argument( - "-t", "--tenant", required=False, type=str, help="Tenant id to connect." - ) - dump_parser.add_argument( - "-g", "--gui", action="store_true", help="Run local server for gui." - ) + dump_parser = sub_parser.add_parser("recon", description="Recon for available data connections", help="Recon for available data connections.") + dump_parser.add_argument("-c", "--clear-cache", action="store_true", help="Clear local disk cache") + dump_parser.add_argument("--cache-path", default=CACHE_PATH, help="Path to store collected resources and data.") + dump_parser.add_argument("-t", "--tenant", required=False, type=str, help="Tenant id to connect.") + dump_parser.add_argument("-g", "--gui", action="store_true", help="Run local server for gui.") def module_nocodemalware(command_subparsers: argparse.ArgumentParser): @@ -79,238 +40,104 @@ def module_nocodemalware(command_subparsers: argparse.ArgumentParser): help="Repurpose trusted execs, service accounts and cloud services to power a malware operation.", ) nocodemalware_parser.add_argument( - "-w", - "--webhook-url", - required=True, - type=str, - help="Webhook url to the flow factory installed in powerplatform", - ) - nocodemalware_parser = nocodemalware_parser.add_subparsers( - help="nocodemalware_subcommand", dest="nocodemalware_subcommand" + "-w", "--webhook-url", required=True, type=str, help="Webhook url to the flow factory installed in powerplatform" ) + nocodemalware_parser = nocodemalware_parser.add_subparsers(help="nocodemalware_subcommand", dest="nocodemalware_subcommand") module_nocodemalware_subcommand_exec(nocodemalware_parser) def module_nocodemalware_subcommand_exec(command_subparsers: argparse.ArgumentParser): - steal_fqdn_parser = command_subparsers.add_parser( - "steal-cookie", description="Steal cookie of fqdn" - ) - steal_fqdn_parser.add_argument( - "--fqdn", - required=True, - type=str, - help="Fully qualified domain name to fetch the cookies of", - ) + steal_fqdn_parser = command_subparsers.add_parser("steal-cookie", description="Steal cookie of fqdn") + steal_fqdn_parser.add_argument("--fqdn", required=True, type=str, help="Fully qualified domain name to fetch the cookies of") - command_subparsers.add_parser( - "steal-power-automate-token", description="Steal power automate token" - ) + command_subparsers.add_parser("steal-power-automate-token", description="Steal power automate token") - execute_command_parser = command_subparsers.add_parser( - "command-exec", description="Execute command on machine" - ) + execute_command_parser = command_subparsers.add_parser("command-exec", description="Execute command on machine") execute_command_parser.add_argument( - "-t", - "--type", - required=True, - type=str, - choices=[cmd_type.value for cmd_type in CodeExecTypeEnum], - help="Command type", - ) - execute_command_parser.add_argument( - "-c", "--command-to-execute", required=True, type=str, help="Command to execute" + "-t", "--type", required=True, type=str, choices=[cmd_type.value for cmd_type in CodeExecTypeEnum], help="Command type" ) + execute_command_parser.add_argument("-c", "--command-to-execute", required=True, type=str, help="Command to execute") - ransomware_parser = command_subparsers.add_parser( - "ransomware", description="Ransomware" - ) + ransomware_parser = command_subparsers.add_parser("ransomware", description="Ransomware") + ransomware_parser.add_argument("--crawl_depth", required=True, type=str, help="Recursively search into subdirectories this many times") ransomware_parser.add_argument( - "--crawl_depth", - required=True, - type=str, - help="Recursively search into subdirectories this many times", + "-k", "--encryption-key", required=True, type=str, help="an encryption key used to encrypt each file identified (AES256)" ) ransomware_parser.add_argument( - "-k", - "--encryption-key", - required=True, - type=str, - help="an encryption key used to encrypt each file identified (AES256)", - ) - ransomware_parser.add_argument( - "--dirs", - required=True, - type=str, - help="A list of directories to begin crawl from separated by a comma (e.g.'C:\\,D:\\')", + "--dirs", required=True, type=str, help="A list of directories to begin crawl from separated by a comma (e.g.'C:\\,D:\\')" ) - exfiltrate_file_parser = command_subparsers.add_parser( - "exfiltrate", description="Exfiltrate file" - ) - exfiltrate_file_parser.add_argument( - "-f", "--file", required=True, type=str, help="Absolute path to file" - ) + exfiltrate_file_parser = command_subparsers.add_parser("exfiltrate", description="Exfiltrate file") + exfiltrate_file_parser.add_argument("-f", "--file", required=True, type=str, help="Absolute path to file") command_subparsers.add_parser("cleanup", description="Cleanup") def module_backdoor(command_subparsers: argparse.ArgumentParser): backdoor_parser = command_subparsers.add_parser( - "backdoor", - description="Install a backdoor on the target tenant.", - help="Install a backdoor on the target tenant", - ) - backdoor_parser.add_argument( - "-e", - "--environment-id", - required=True, - type=str, - help="Environment id in powerplatform.", - ) - backdoor_subparsers = backdoor_parser.add_subparsers( - help="backdoor_subcommand", dest="backdoor_subcommand" + "backdoor", description="Install a backdoor on the target tenant.", help="Install a backdoor on the target tenant" ) + backdoor_parser.add_argument("-e", "--environment-id", required=True, type=str, help="Environment id in powerplatform.") + backdoor_subparsers = backdoor_parser.add_subparsers(help="backdoor_subcommand", dest="backdoor_subcommand") ## Delete Flow parser ## delete_flow_parser = backdoor_subparsers.add_parser( - BackdoorActionType.delete_flow.value, - description="Deletes flow.", - help="Deletes flow using installed backdoor flow.", - ) - delete_flow_parser.add_argument( - "-w", - "--webhook-url", - required=True, - type=str, - help="Webhook url to the flow factory installed in powerplatform", - ) - delete_flow_parser.add_argument( - "-f", "--flow-id", type=str, help="Flow id to delete." + BackdoorActionType.delete_flow.value, description="Deletes flow.", help="Deletes flow using installed backdoor flow." ) + delete_flow_parser.add_argument("-w", "--webhook-url", required=True, type=str, help="Webhook url to the flow factory installed in powerplatform") + delete_flow_parser.add_argument("-f", "--flow-id", type=str, help="Flow id to delete.") ## Create Flow parser ## create_flow_parser = backdoor_subparsers.add_parser( - BackdoorActionType.create_flow.value, - description="Creates a flow.", - help="Creates a flow using installed backdoor flow.", - ) - create_flow_parser.add_argument( - "-w", - "--webhook-url", - required=True, - type=str, - help="Webhook url to the flow factory installed in powerplatform", - ) - create_flow_parser.add_argument( - "-i", - "--input", - type=str, - required=True, - help="Path to flow details input file.", + BackdoorActionType.create_flow.value, description="Creates a flow.", help="Creates a flow using installed backdoor flow." ) + create_flow_parser.add_argument("-w", "--webhook-url", required=True, type=str, help="Webhook url to the flow factory installed in powerplatform") + create_flow_parser.add_argument("-i", "--input", type=str, required=True, help="Path to flow details input file.") ## Get connections parser ## get_connections_parser = backdoor_subparsers.add_parser( - BackdoorActionType.get_connections.value, - description="Get connections", - help="Gets connections details in environment", + BackdoorActionType.get_connections.value, description="Get connections", help="Gets connections details in environment" ) get_connections_parser.add_argument( - "-w", - "--webhook-url", - required=True, - type=str, - help="Webhook url to the flow factory installed in powerplatform", - ) - get_connections_parser.add_argument( - "-o", "--output", type=str, default="", help="Path to output file." + "-w", "--webhook-url", required=True, type=str, help="Webhook url to the flow factory installed in powerplatform" ) + get_connections_parser.add_argument("-o", "--output", type=str, default="", help="Path to output file.") ## backdoor installer parser ## installer = backdoor_subparsers.add_parser( - BackdoorActionType.install_factory.value, - description="Install flow factory", - help="Installs flow factory in powerplatform", - ) - installer.add_argument( - "-c", - "--connection-id", - required=True, - type=str, - help="The connection id of management connection", - ) - installer.add_argument( - "-t", "--tenant", required=False, type=str, help="Tenant id to connect." + BackdoorActionType.install_factory.value, description="Install flow factory", help="Installs flow factory in powerplatform" ) + installer.add_argument("-c", "--connection-id", required=True, type=str, help="The connection id of management connection") + installer.add_argument("-t", "--tenant", required=False, type=str, help="Tenant id to connect.") def module_phishing(command_subparsers: argparse.ArgumentParser): - phishing = command_subparsers.add_parser( - "phishing", - description="Deploy a trustworthy phishing app", - help="Deploy a trustworthy phishing app.", - ) - phishing_subparsers = phishing.add_subparsers( - help="phishing_subcommand", dest="phishing_subcommand" - ) + phishing = command_subparsers.add_parser("phishing", description="Deploy a trustworthy phishing app", help="Deploy a trustworthy phishing app.") + phishing_subparsers = phishing.add_subparsers(help="phishing_subcommand", dest="phishing_subcommand") installer = phishing_subparsers.add_parser( - "install-app", - description="Installs phishing app.", - help="Installs a phishing app in the target environment.", - ) - installer.add_argument( - "-i", "--input", type=str, required=True, help="Path to app package zip file." - ) - installer.add_argument( - "-t", "--tenant", required=False, type=str, help="Tenant id to connect." - ) - installer.add_argument( - "-n", "--app-name", required=True, type=str, help="Display name of the app." - ) - installer.add_argument( - "-e", - "--environment-id", - required=True, - type=str, - help="Environment id to install the app in.", + "install-app", description="Installs phishing app.", help="Installs a phishing app in the target environment." ) + installer.add_argument("-i", "--input", type=str, required=True, help="Path to app package zip file.") + installer.add_argument("-t", "--tenant", required=False, type=str, help="Tenant id to connect.") + installer.add_argument("-n", "--app-name", required=True, type=str, help="Display name of the app.") + installer.add_argument("-e", "--environment-id", required=True, type=str, help="Environment id to install the app in.") - app_share = phishing_subparsers.add_parser( - "share-app", - description="Share app with organization", - help="Share app with organization", - ) - app_share.add_argument( - "-a", "--app-id", required=True, type=str, help="App id to share" - ) - app_share.add_argument( - "-e", - "--environment-id", - required=True, - type=str, - help="Environment id that the app belongs to.", - ) - app_share.add_argument( - "-t", "--tenant", required=True, type=str, help="Tenant id to connect." - ) + app_share = phishing_subparsers.add_parser("share-app", description="Share app with organization", help="Share app with organization") + app_share.add_argument("-a", "--app-id", required=True, type=str, help="App id to share") + app_share.add_argument("-e", "--environment-id", required=True, type=str, help="Environment id that the app belongs to.") + app_share.add_argument("-t", "--tenant", required=True, type=str, help="Tenant id to connect.") def module_copilot(command_subparsers: argparse.ArgumentParser): copilot = command_subparsers.add_parser( - "copilot", - description="Connects and interacts with copilot.", - help="Connects and interacts with copilot.", - ) - copilot_subparsers = copilot.add_subparsers( - help="copilot_subcommand", dest="copilot_subcommand" + "copilot", description="Connects and interacts with copilot.", help="Connects and interacts with copilot." ) + copilot_subparsers = copilot.add_subparsers(help="copilot_subcommand", dest="copilot_subcommand") interactive_chat = copilot_subparsers.add_parser( - "chat", - description="Starts an interactive chat with copilot", - help="Connects to copilot and starts an interactive chat session.", + "chat", description="Starts an interactive chat with copilot", help="Connects to copilot and starts an interactive chat session." ) copilot_modules(interactive_chat) @@ -326,24 +153,11 @@ def module_copilot(command_subparsers: argparse.ArgumentParser): def copilot_modules(parser): + parser.add_argument("-u", "--user", required=True, type=str, help="User email to connect.") + parser.add_argument("-p", "--password", required=False, type=str, help="User password to connect.") + parser.add_argument("--cached-token", action="store_true", help="Use cached access token to connect to copilot if exists.") parser.add_argument( - "-u", "--user", required=True, type=str, help="User email to connect." - ) - parser.add_argument( - "-p", "--password", required=False, type=str, help="User password to connect." - ) - parser.add_argument( - "--cached-token", - action="store_true", - help="Use cached access token to connect to copilot if exists.", - ) - parser.add_argument( - "-s", - "--scenario", - required=True, - type=str, - choices=[scenario_type.value for scenario_type in CopilotScenarioEnum], - help="Scenario to run.", + "-s", "--scenario", required=True, type=str, choices=[scenario_type.value for scenario_type in CopilotScenarioEnum], help="Scenario to run." ) parser.add_argument( @@ -359,13 +173,9 @@ def copilot_modules(parser): def module_copilot_studio(command_subparsers: argparse.ArgumentParser): copilot = command_subparsers.add_parser( - "copilot-studio-hunter", - description="Scan, enumerate and recon Copilot Studio bots.", - help="Scan, enumerate and recon Copilot Studio bots.", - ) - copilot_subparsers = copilot.add_subparsers( - help="copilot_studio_subcommand", dest="copilot_studio_subcommand" + "copilot-studio-hunter", description="Scan, enumerate and recon Copilot Studio bots.", help="Scan, enumerate and recon Copilot Studio bots." ) + copilot_subparsers = copilot.add_subparsers(help="copilot_studio_subcommand", dest="copilot_studio_subcommand") deep_scan = copilot_subparsers.add_parser( "deep-scan", @@ -385,74 +195,26 @@ def module_copilot_studio(command_subparsers: argparse.ArgumentParser): def copilot_studio_modules(parser: argparse.ArgumentParser, module: str): if module == "deep-scan": + parser.add_argument("-r", "--rate", type=int, default=0, help="Rate limit in seconds between ffuf requests") + parser.add_argument("-t", "--threads", type=int, default=40, help="Number of concurrent ffuf threads") + parser.add_argument("--mode", choices=["verbose", "silent"], default="-s", help="Choose between verbose (-v) and silent (-s) mode for ffuf.") + parser.add_argument("-tp", "--timeout_prefix", help="The timeout for the solution prefix scan to have (in seconds)", default=300) parser.add_argument( - "-r", - "--rate", - type=int, - default=0, - help="Rate limit in seconds between ffuf requests", - ) - parser.add_argument( - "-t", - "--threads", - type=int, - default=40, - help="Number of concurrent ffuf threads", - ) - parser.add_argument( - "--mode", - choices=["verbose", "silent"], - default="-s", - help="Choose between verbose (-v) and silent (-s) mode for ffuf.", - ) - parser.add_argument( - "-tp", - "--timeout_prefix", - help="The timeout for the solution prefix scan to have (in seconds)", - default=300, - ) - parser.add_argument( - "-tb", - "--timeout_bots", - help="The timeout for each of the bot scans (one-word/two-word/three-word) to have (in seconds)", - default=300, + "-tb", "--timeout_bots", help="The timeout for each of the bot scans (one-word/two-word/three-word) to have (in seconds)", default=300 ) group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "-d", - "--domain", - type=str, - help="The domain to query for tenant ID and run ffuf on", - ) - group.add_argument( - "-i", "--tenant-id", type=str, help="The tenant ID to run FFUF on" - ) + group.add_argument("-d", "--domain", type=str, help="The domain to query for tenant ID and run ffuf on") + group.add_argument("-i", "--tenant-id", type=str, help="The tenant ID to run FFUF on") if module == "enum": - parser.add_argument( - "-e", - "--enumerate", - choices=["environment", "tenant"], - help="Run the enumeration function on environment or tenant", - ) - parser.add_argument( - "-t", - "--timeout", - help="The timeout for the enumeration process to have (in seconds)", - default=300, - ) + parser.add_argument("-e", "--enumerate", choices=["environment", "tenant"], help="Run the enumeration function on environment or tenant") + parser.add_argument("-t", "--timeout", help="The timeout for the enumeration process to have (in seconds)", default=300) def parse_arguments(): parser = argparse.ArgumentParser() - parser.add_argument( - "-l", - "--log-level", - default=logging.INFO, - type=lambda x: getattr(logging, x), - help="Configure the logging level.", - ) + parser.add_argument("-l", "--log-level", default=logging.INFO, type=lambda x: getattr(logging, x), help="Configure the logging level.") command_subparsers = parser.add_subparsers(help="command", dest="command") module_dump(command_subparsers) diff --git a/src/powerpwn/cli/runners.py b/src/powerpwn/cli/runners.py index 5b5d870..556b349 100644 --- a/src/powerpwn/cli/runners.py +++ b/src/powerpwn/cli/runners.py @@ -41,9 +41,7 @@ def __init_command_token(args, scope: str) -> Auth: if auth := acquire_token_from_cached_refresh_token(scope, args.tenant): return auth - logger.info( - "Failed to acquire token from cached refresh token. Falling back to device-flow authentication to acquire new token." - ) + logger.info("Failed to acquire token from cached refresh token. Falling back to device-flow authentication to acquire new token.") return acquire_token(scope=scope, tenant=args.tenant) @@ -65,19 +63,13 @@ def run_recon_command(args) -> str: # cache if args.clear_cache: - __clear_cache( - os.path.join(args.cache_path, os.path.join(auth.tenant, "resources")) - ) + __clear_cache(os.path.join(args.cache_path, os.path.join(auth.tenant, "resources"))) scoped_cache_path = _get_scoped_cache_path(args, auth.tenant) - entities_fetcher = ResourcesCollector( - token=auth.token, cache_path=scoped_cache_path - ) + entities_fetcher = ResourcesCollector(token=auth.token, cache_path=scoped_cache_path) entities_fetcher.collect_and_cache() - logger.info( - f"Recon is completed for tenant {auth.tenant} in {entities_path(scoped_cache_path)}" - ) + logger.info(f"Recon is completed for tenant {auth.tenant} in {entities_path(scoped_cache_path)}") return auth.tenant @@ -102,14 +94,10 @@ def run_gui_command(args) -> None: tenant_list = os.listdir(args.cache_path) if len(tenant_list) == 1: scoped_cache_path = _get_scoped_cache_path(args, tenant_list[0]) - logger.info( - f"Only one tenant found in cache path. Using '{tenant_list[0]}' as tenant." - ) + logger.info(f"Only one tenant found in cache path. Using '{tenant_list[0]}' as tenant.") if not scoped_cache_path: - logger.error( - "Tenant is not provided and it can not be found in cache. Please provide tenant id with -t flag." - ) + logger.error("Tenant is not provided and it can not be found in cache. Please provide tenant id with -t flag.") return Gui().run(cache_path=scoped_cache_path) @@ -124,17 +112,11 @@ def run_dump_command(args) -> str: __clear_cache(os.path.join(args.cache_path, os.path.join(auth.tenant, "data"))) scoped_cache_path = _get_scoped_cache_path(args, auth.tenant) - is_data_collected = DataCollector( - token=auth.token, cache_path=scoped_cache_path - ).collect() + is_data_collected = DataCollector(token=auth.token, cache_path=scoped_cache_path).collect() if not is_data_collected: - logger.info( - "No resources found to get data dump. Please make sure recon runs first or run dump command again with -r/--recon flag." - ) + logger.info("No resources found to get data dump. Please make sure recon runs first or run dump command again with -r/--recon flag.") else: - logger.info( - f"Dump is completed for tenant {auth.tenant} in {collected_data_path(scoped_cache_path)}" - ) + logger.info(f"Dump is completed for tenant {auth.tenant} in {collected_data_path(scoped_cache_path)}") return auth.tenant @@ -152,9 +134,7 @@ def run_backdoor_flow_command(args): elif action_type == BackdoorActionType.get_connections: backdoor_flow = BackdoorFlow(args.webhook_url) output_to_file = args.output and args.output != "" - connections = backdoor_flow.get_connections( - args.environment_id, not output_to_file - ) + connections = backdoor_flow.get_connections(args.environment_id, not output_to_file) if output_to_file: f = open(args.output, "w+") f.write(json.dumps(connections, indent=4)) @@ -173,15 +153,11 @@ def run_nocodemalware_command(args): if command_type == CommandToRunEnum.CLEANUP: res = malware_runner.cleanup() elif command_type == CommandToRunEnum.CODE_EXEC: - res = malware_runner.exec_command( - args.command_to_execute, CodeExecTypeEnum(args.type) - ) + res = malware_runner.exec_command(args.command_to_execute, CodeExecTypeEnum(args.type)) elif command_type == CommandToRunEnum.EXFILTRATION: res = malware_runner.exfiltrate(args.file) elif command_type == CommandToRunEnum.RANSOMWARE: - res = malware_runner.ransomware( - args.crawl_depth, args.dirs.split(","), args.encryption_key - ) + res = malware_runner.ransomware(args.crawl_depth, args.dirs.split(","), args.encryption_key) elif command_type == CommandToRunEnum.STEAL_COOKIE: res = malware_runner.steal_cookie(args.cookie) elif command_type == CommandToRunEnum.STEAL_POWER_AUTOMATE_TOKEN: @@ -195,9 +171,7 @@ def run_phishing_command(args): if args.phishing_subcommand == "install-app": return app_installer.install_app(args.input, args.app_name, args.environment_id) elif args.phishing_subcommand == "share-app": - return app_installer.share_app_with_org( - args.app_id, args.environment_id, args.tenant - ) + return app_installer.share_app_with_org(args.app_id, args.environment_id, args.tenant) raise NotImplementedError("Phishing command has not been implemented yet.") diff --git a/src/powerpwn/copilot_studio/modules/deep_scan.py b/src/powerpwn/copilot_studio/modules/deep_scan.py index f15e816..b0dd43d 100644 --- a/src/powerpwn/copilot_studio/modules/deep_scan.py +++ b/src/powerpwn/copilot_studio/modules/deep_scan.py @@ -2,7 +2,7 @@ import os import re import signal -import subprocess +import subprocess # nosec import sys import pandas as pd @@ -92,9 +92,7 @@ def get_tenant_id(domain: str, timeout: int = 10): tenant_id = response.json().get("issuer").split("/")[3] return tenant_id else: - logging.error( - f"Failed to retrieve tenant ID: {response.status_code} {response.text}" - ) + logging.error(f"Failed to retrieve tenant ID: {response.status_code} {response.text}") return None except requests.Timeout: logging.error(f"Request timed out after {timeout} seconds") @@ -134,9 +132,7 @@ def get_ffuf_results( :param timeout: The timeout (in seconds) to set for each one of the FFUF scans (1-word/2-word/3-word) :return: Returns the FFUF terminal output per line, to allow users to see progress (-s can be used to silence this) """ - ffuf_path_1 = get_project_file_path( - "internal_results/ffuf_results", f"ffuf_results_{domain}_one_word_names.csv" - ) + ffuf_path_1 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{domain}_one_word_names.csv") # Run first for one-letter words command = [ @@ -193,7 +189,7 @@ def get_ffuf_results( print("\nScanning for 1-word bot names") # TODO: Verify and improve guardrails for using subprocess - popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) + popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) # nosec for stdout_line in iter(popen.stdout.readline, ""): yield stdout_line, popen popen.stdout.close() @@ -201,9 +197,7 @@ def get_ffuf_results( if return_code: raise subprocess.CalledProcessError(return_code, command) - ffuf_path_2 = get_project_file_path( - "internal_results/ffuf_results", f"ffuf_results_{domain}_two_word_names.csv" - ) + ffuf_path_2 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{domain}_two_word_names.csv") # Run for 2-letter words command = [ @@ -262,7 +256,7 @@ def get_ffuf_results( print("\nScanning for 2-word bot names") # TODO: Verify and improve guardrails for using subprocess - popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) + popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) # nosec for stdout_line in iter(popen.stdout.readline, ""): yield stdout_line, popen popen.stdout.close() @@ -270,9 +264,7 @@ def get_ffuf_results( if return_code: raise subprocess.CalledProcessError(return_code, command) - ffuf_path_3 = get_project_file_path( - "internal_results/ffuf_results", f"ffuf_results_{domain}.csv" - ) + ffuf_path_3 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{domain}.csv") # Use for 3-letter words command = [ @@ -333,7 +325,7 @@ def get_ffuf_results( print("\nScanning for 3-word bot names") # TODO: Verify and improve guardrails for using subprocess - popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) + popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) # nosec for stdout_line in iter(popen.stdout.readline, ""): yield stdout_line, popen popen.stdout.close() @@ -342,14 +334,7 @@ def get_ffuf_results( raise subprocess.CalledProcessError(return_code, command) -def get_ffuf_results_prefix( - endpoint: str, - wordlist_prefix: str, - wordlist_suffix_1: str, - rate_limit: int, - threads: int, - timeout: int, -) -> str: +def get_ffuf_results_prefix(endpoint: str, wordlist_prefix: str, wordlist_suffix_1: str, rate_limit: int, threads: int, timeout: int) -> str: """ Run FFUF on the input endpoint: 1. Use fuzzing locations in the URL based on the FUZZ1/2/3 keywords. @@ -412,7 +397,7 @@ def get_ffuf_results_prefix( ] # TODO: Verify and improve guardrails for using subprocess - popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) + popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) # nosec for stdout_line in iter(popen.stdout.readline, ""): if "FUZZ1:" in stdout_line: fuzz1_value = stdout_line.split("FUZZ1:")[1].split()[0] @@ -447,8 +432,7 @@ def print_brand(tenant: str): } # Make the request - response = requests.get(url, headers=headers) - + response = requests.get(url, headers=headers) # nosec # Check if the request was successful if response.status_code == 200: data = response.json() @@ -479,18 +463,10 @@ def run(self): if self.args.domain: - ffuf_path_1 = get_project_file_path( - "internal_results/ffuf_results", - f"ffuf_results_{self.args.domain}_one_word_names.csv", - ) + ffuf_path_1 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{self.args.domain}_one_word_names.csv") - ffuf_path_2 = get_project_file_path( - "internal_results/ffuf_results", - f"ffuf_results_{self.args.domain}_two_word_names.csv", - ) - ffuf_path_3 = get_project_file_path( - "internal_results/ffuf_results", f"ffuf_results_{self.args.domain}.csv" - ) + ffuf_path_2 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{self.args.domain}_two_word_names.csv") + ffuf_path_3 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{self.args.domain}.csv") tenant_id = get_tenant_id(self.args.domain) @@ -503,31 +479,21 @@ def run(self): tenant_id_for_api = tenant_id.replace("-", "") env_bots_endpoint = f"https://default{tenant_id_for_api[:-2]}.{tenant_id_for_api[-2:]}.environment.api.powerplatform.com/powervirtualagents/botsbyschema/" - logging.info( - f"Endpoint for the default environment bots schema: {env_bots_endpoint}" - ) + logging.info(f"Endpoint for the default environment bots schema: {env_bots_endpoint}") fuzz1_value = None fuzz_file_name = f"fuzz1_value_{self.args.domain}.txt" - fuzz_file_path = get_project_file_path( - "internal_results/prefix_fuzz_values", f"{fuzz_file_name}" - ) + fuzz_file_path = get_project_file_path("internal_results/prefix_fuzz_values", f"{fuzz_file_name}") - print( - "Checking if an existing solution publisher prefix value exists for this domain." - ) + print("Checking if an existing solution publisher prefix value exists for this domain.") # Check if the file exists if os.path.exists(fuzz_file_path): # Open the file and read the first line with open(fuzz_file_path, "r") as file: - first_line = ( - file.readline().strip() - ) # .strip() removes any leading/trailing whitespace + first_line = file.readline().strip() # .strip() removes any leading/trailing whitespace - print( - "An existing solution publisher prefix value was found for this domain, continuing to search for CoPilot demo websites." - ) + print("An existing solution publisher prefix value was found for this domain, continuing to search for CoPilot demo websites.") if first_line: for line, popen in get_ffuf_results( @@ -568,9 +534,7 @@ def run(self): transformed_urls = df["url"].apply(transform_url) url_file = f"url_output_{self.args.domain}.txt" - url_file_path = get_project_file_path( - "internal_results/url_results", f"{url_file}" - ) + url_file_path = get_project_file_path("internal_results/url_results", f"{url_file}") # Save the result to a new text file with open(f"{url_file_path}", "w") as f: @@ -589,22 +553,18 @@ def run(self): # TODO: Verify and improve guardrails for using subprocess + replace shell=True # Run the command - subprocess.run(command, shell=True, check=True) + subprocess.run(command, shell=True, check=True) # nosec except subprocess.CalledProcessError as e: print(f"Error occurred: {e}") - print( - "Done, results saved under final_results/chat_exists_output.txt" - ) + print("Done, results saved under final_results/chat_exists_output.txt") else: logging.error("Did not find a solution publisher prefix") sys.exit(1) else: - print( - "No existing solution publisher prefix value found for this domain, starting prefix scan." - ) + print("No existing solution publisher prefix value found for this domain, starting prefix scan.") for value, popen in get_ffuf_results_prefix( env_bots_endpoint, @@ -616,15 +576,11 @@ def run(self): ): if value: fuzz1_value = value - print( - "Found default solution publisher prefix, proceeding to scan bot names" - ) + print("Found default solution publisher prefix, proceeding to scan bot names") break fuzz_file_name = f"fuzz1_value_{self.args.domain}.txt" - fuzz_file_path = get_project_file_path( - "internal_results/prefix_fuzz_values", f"{fuzz_file_name}" - ) + fuzz_file_path = get_project_file_path("internal_results/prefix_fuzz_values", f"{fuzz_file_name}") if fuzz1_value: with open(fuzz_file_path, "w") as file: @@ -672,9 +628,7 @@ def run(self): transformed_urls = df["url"].apply(transform_url) url_file = f"url_output_{self.args.domain}.txt" - url_file_path = get_project_file_path( - "internal_results/url_results", f"{url_file}" - ) + url_file_path = get_project_file_path("internal_results/url_results", f"{url_file}") # Save the result to a new text file with open(url_file_path, "w") as f: @@ -693,13 +647,11 @@ def run(self): # TODO: Verify and improve guardrails for using subprocess + replace shell=True # Run the command - subprocess.run(command, shell=True, check=True) + subprocess.run(command, shell=True, check=True) # nosec except subprocess.CalledProcessError as e: print(f"Error occurred: {e}") - print( - "Done, results saved under final_results/chat_exists_output.txt" - ) + print("Done, results saved under final_results/chat_exists_output.txt") else: logging.error("Did not find a solution publisher prefix") @@ -710,48 +662,29 @@ def run(self): # Print the tenant's domain if available print_brand(self.args.tenant_id) - ffuf_path_1 = get_project_file_path( - "internal_results/ffuf_results", - f"ffuf_results_{self.args.tenant_id}_one_word_names.csv", - ) + ffuf_path_1 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{self.args.tenant_id}_one_word_names.csv") - ffuf_path_2 = get_project_file_path( - "internal_results/ffuf_results", - f"ffuf_results_{self.args.tenant_id}_two_word_names.csv", - ) + ffuf_path_2 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{self.args.tenant_id}_two_word_names.csv") - ffuf_path_3 = get_project_file_path( - "internal_results/ffuf_results", - f"ffuf_results_{self.args.tenant_id}.csv", - ) + ffuf_path_3 = get_project_file_path("internal_results/ffuf_results", f"ffuf_results_{self.args.tenant_id}.csv") ten_id = self.args.tenant_id.replace("-", "").replace("Default", "") env_bots_endpoint = f"https://default{ten_id[:-2]}.{ten_id[-2:]}.environment.api.powerplatform.com/powervirtualagents/botsbyschema/" - logging.info( - f"Endpoint for the environment ID bots schema: {env_bots_endpoint}" - ) + logging.info(f"Endpoint for the environment ID bots schema: {env_bots_endpoint}") fuzz1_value = None fuzz_file_name = f"fuzz1_value_{self.args.tenant_id}.txt" - fuzz_file_path = get_project_file_path( - "internal_results/prefix_fuzz_values", f"{fuzz_file_name}" - ) + fuzz_file_path = get_project_file_path("internal_results/prefix_fuzz_values", f"{fuzz_file_name}") - print( - "Checking if an existing solution publisher prefix value exists for this domain." - ) + print("Checking if an existing solution publisher prefix value exists for this domain.") # Check if the file exists if os.path.exists(fuzz_file_path): # Open the file and read the first line with open(fuzz_file_path, "r") as file: - first_line = ( - file.readline().strip() - ) # .strip() removes any leading/trailing whitespace + first_line = file.readline().strip() # .strip() removes any leading/trailing whitespace - print( - "An existing publisher prefix value was found for this tenant, continuing to search for CoPilot demo websites." - ) + print("An existing publisher prefix value was found for this tenant, continuing to search for CoPilot demo websites.") if first_line: for line, popen in get_ffuf_results( @@ -792,9 +725,7 @@ def run(self): transformed_urls = df["url"].apply(transform_url) url_file = f"url_output_{self.args.tenant_id}.txt" - url_file_path = get_project_file_path( - "internal_results/url_results", f"{url_file}" - ) + url_file_path = get_project_file_path("internal_results/url_results", f"{url_file}") # Save the result to a new text file with open(f"{url_file_path}", "w") as f: @@ -813,22 +744,18 @@ def run(self): # TODO: Verify and improve guardrails for using subprocess + replace shell=True # Run the command - subprocess.run(command, shell=True, check=True) + subprocess.run(command, shell=True, check=True) # nosec except subprocess.CalledProcessError as e: print(f"Error occurred: {e}") - print( - "Done, results saved under final_results/chat_exists_output.txt" - ) + print("Done, results saved under final_results/chat_exists_output.txt") else: logging.error("Did not find a default solution publisher prefix") sys.exit(1) else: - print( - "No existing prefix value found for this tenant, starting solution publisher prefix scan." - ) + print("No existing prefix value found for this tenant, starting solution publisher prefix scan.") logging.info("Running ffuf") for value, popen in get_ffuf_results_prefix( @@ -841,15 +768,11 @@ def run(self): ): if value: fuzz1_value = value - print( - "Found solution publisher prefix, proceeding to scan bot names" - ) + print("Found solution publisher prefix, proceeding to scan bot names") break fuzz_file_name = f"fuzz1_value_{self.args.tenant_id}.txt" - fuzz_file_path = get_project_file_path( - "internal_results/prefix_fuzz_values", f"{fuzz_file_name}" - ) + fuzz_file_path = get_project_file_path("internal_results/prefix_fuzz_values", f"{fuzz_file_name}") if fuzz1_value: with open(fuzz_file_path, "w") as file: @@ -896,9 +819,7 @@ def run(self): transformed_urls = df["url"].apply(transform_url) url_file = f"url_output_{self.args.tenant_id}.txt" - url_file_path = get_project_file_path( - "internal_results/url_results", f"{url_file}" - ) + url_file_path = get_project_file_path("internal_results/url_results", f"{url_file}") # Save the result to a new text file with open(url_file_path, "w") as f: @@ -917,13 +838,11 @@ def run(self): # TODO: Verify and improve guardrails for using subprocess + replace shell=True # Run the command - subprocess.run(command, shell=True, check=True) + subprocess.run(command, shell=True, check=True) # nosec except subprocess.CalledProcessError as e: print(f"Error occurred: {e}") - print( - "Done, results saved under final_results/chat_exists_output.txt" - ) + print("Done, results saved under final_results/chat_exists_output.txt") else: logging.error("Did not find a solution publisher prefix") diff --git a/src/powerpwn/copilot_studio/modules/enum.py b/src/powerpwn/copilot_studio/modules/enum.py index deb62ea..53ff9f3 100644 --- a/src/powerpwn/copilot_studio/modules/enum.py +++ b/src/powerpwn/copilot_studio/modules/enum.py @@ -2,7 +2,7 @@ import logging import os import re -import subprocess +import subprocess # nosec import time from powerpwn.copilot_studio.modules.path_utils import get_project_file_path @@ -13,9 +13,7 @@ def is_valid_subdomain(subdomain: str) -> bool: Validate that the subdomain follows the expected pattern. """ # Define a regex pattern for valid subdomains - pattern = re.compile( - r"^[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-]{2}\.(tenant|environment)\.api\.powerplatform\.com$" - ) + pattern = re.compile(r"^[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-]{2}\.(tenant|environment)\.api\.powerplatform\.com$") return pattern.match(subdomain) is not None @@ -26,9 +24,7 @@ def write_to_file(values: list, filename: str): :param values: The list of values to write :param filename: The file to write to """ - os.makedirs( - os.path.dirname(filename), exist_ok=True - ) # Ensure the output directory exists + os.makedirs(os.path.dirname(filename), exist_ok=True) # Ensure the output directory exists with open(filename, "w") as file: for value in values: @@ -44,7 +40,7 @@ def get_amass_results(domain_type: str, timeout: int) -> bytes: """ command = ["amass", "enum", "-d", f"{domain_type}.api.powerplatform.com"] start_time = time.time() - popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) + popen = subprocess.Popen(command, stdout=subprocess.PIPE, universal_newlines=True) # nosec try: for stdout_line in iter(popen.stdout.readline, ""): if time.time() - start_time > timeout: @@ -97,40 +93,26 @@ def __init__(self, args): self.run() def run(self): - print( - f"Starting to enumerate {self.args.enumerate}s, disconnect from VPN during this part for best results" - ) + print(f"Starting to enumerate {self.args.enumerate}s, disconnect from VPN during this part for best results") print(f"Timeout defined to {int(self.args.timeout) / 60} minutes") - print( - "Enumeration results will be printed below and saved to the final_results directory" - ) + print("Enumeration results will be printed below and saved to the final_results directory") try: - for line, popen in get_amass_results( - self.args.enumerate, self.args.timeout - ): + for line, popen in get_amass_results(self.args.enumerate, self.args.timeout): subdomain = line.strip().split(" (FQDN) -->")[0] if is_valid_subdomain(subdomain): - formatted_subdomain = format_subdomain( - subdomain, self.args.enumerate - ) - if ( - "enviro-nmen-t" not in formatted_subdomain - ): # TODO: check if still relevant + formatted_subdomain = format_subdomain(subdomain, self.args.enumerate) + if "enviro-nmen-t" not in formatted_subdomain: # TODO: check if still relevant print(formatted_subdomain) self.tenant_results.append(formatted_subdomain) else: logging.warning(f"Filtered out invalid subdomain: {subdomain}") except subprocess.TimeoutExpired: - logging.error( - f"Amass enumeration timed out after {self.args.timeout} seconds" - ) + logging.error(f"Amass enumeration timed out after {self.args.timeout} seconds") print(f"Amass enumeration timed out after {self.args.timeout} seconds") finally: # Ensure partial results are saved even if timeout occurs timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") unique_filename = f"{self.args.enumerate}_enumeration_results_{timestamp}" - amass_path = get_project_file_path( - "final_results", f"{unique_filename}.txt" - ) + amass_path = get_project_file_path("final_results", f"{unique_filename}.txt") write_to_file(self.tenant_results, f"{amass_path}") logging.info(f"{self.args.enumerate}s enumerated and saved to {amass_path}") diff --git a/src/powerpwn/main.py b/src/powerpwn/main.py index 88cd0ce..f80a6d6 100644 --- a/src/powerpwn/main.py +++ b/src/powerpwn/main.py @@ -26,11 +26,7 @@ def main(): args = parse_arguments() - logging.basicConfig( - level=args.log_level, - format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) + logging.basicConfig(level=args.log_level, format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S") logger.level = args.log_level command = args.command