Skip to content

Commit

Permalink
ruff: add flake8-simplify (SIM) and auto-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Marshall-Hallenbeck committed Oct 14, 2023
1 parent cf63796 commit d8b2931
Show file tree
Hide file tree
Showing 41 changed files with 209 additions and 436 deletions.
2 changes: 1 addition & 1 deletion nxc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def gen_cli_args():
p_loader = ProtocolLoader()
protocols = p_loader.get_protocols()

for protocol in protocols.keys():
for protocol in protocols:
try:
protocol_object = p_loader.load_protocol(protocols[protocol]["argspath"])
subparsers = protocol_object.proto_args(subparsers, std_parser, module_parser)
Expand Down
18 changes: 8 additions & 10 deletions nxc/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self, args, db, host):
self.admin_privs = False
self.password = ""
self.username = ""
self.kerberos = True if self.args.kerberos or self.args.use_kcache or self.args.aesKey else False
self.kerberos = bool(self.args.kerberos or self.args.use_kcache or self.args.aesKey)
self.aesKey = None if not self.args.aesKey else self.args.aesKey[0]
self.kdcHost = None if not self.args.kdcHost else self.args.kdcHost
self.use_kcache = None if not self.args.use_kcache else self.args.use_kcache
Expand Down Expand Up @@ -184,10 +184,9 @@ def call_cmd_args(self):
None
"""
for attr, value in vars(self.args).items():
if hasattr(self, attr) and callable(getattr(self, attr)):
if value is not False and value is not None:
self.logger.debug(f"Calling {attr}()")
getattr(self, attr)()
if hasattr(self, attr) and callable(getattr(self, attr)) and value is not False and value is not None:
self.logger.debug(f"Calling {attr}()")
getattr(self, attr)()

def call_modules(self):
"""Calls modules and performs various actions based on the module's attributes.
Expand Down Expand Up @@ -231,7 +230,7 @@ def inc_failed_login(self, username):
global global_failed_logins
global user_failed_logins

if username not in user_failed_logins.keys():
if username not in user_failed_logins:
user_failed_logins[username] = 0

user_failed_logins[username] += 1
Expand All @@ -248,9 +247,8 @@ def over_fail_limit(self, username):
if self.failed_logins == self.args.fail_limit:
return True

if username in user_failed_logins.keys():
if self.args.ufail_limit == user_failed_logins[username]:
return True
if username in user_failed_logins and self.args.ufail_limit == user_failed_logins[username]:
return True

return False

Expand Down Expand Up @@ -392,7 +390,7 @@ def try_credentials(self, domain, username, owned, secret, cred_type, data=None)
if self.args.continue_on_success and owned:
return False
# Enforcing FQDN for SMB if not using local authentication. Related issues/PRs: #26, #28, #24, #38
if self.args.protocol == "smb" and not self.args.local_auth and "." not in domain and not self.args.laps and secret != "" and not (self.domain.upper() == self.hostname.upper()):
if self.args.protocol == "smb" and not self.args.local_auth and "." not in domain and not self.args.laps and secret != "" and self.domain.upper() != self.hostname.upper():
self.logger.error(f"Domain {domain} for user {username.rstrip()} need to be FQDN ex:domain.local, not domain")
return False

Expand Down
27 changes: 13 additions & 14 deletions nxc/helpers/bloodhound.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,21 @@ def add_user_bh(user, domain, logger, config):
encrypted=False,
)
try:
with driver.session() as session:
with session.begin_transaction() as tx:
for info in users_owned:
if info["username"][-1] == "$":
user_owned = info["username"][:-1] + "." + info["domain"]
account_type = "Computer"
else:
user_owned = info["username"] + "@" + info["domain"]
account_type = "User"
with driver.session() as session, session.begin_transaction() as tx:
for info in users_owned:
if info["username"][-1] == "$":
user_owned = info["username"][:-1] + "." + info["domain"]
account_type = "Computer"
else:
user_owned = info["username"] + "@" + info["domain"]
account_type = "User"

result = tx.run(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) RETURN c')
result = tx.run(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) RETURN c')

if result.data()[0]["c"].get("owned") in (False, None):
logger.debug(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name')
result = tx.run(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name')
logger.highlight(f"Node {user_owned} successfully set as owned in BloodHound")
if result.data()[0]["c"].get("owned") in (False, None):
logger.debug(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name')
result = tx.run(f'MATCH (c:{account_type} {{name:"{user_owned}"}}) SET c.owned=True RETURN c.name AS name')
logger.highlight(f"Node {user_owned} successfully set as owned in BloodHound")
except AuthError:
logger.fail(f"Provided Neo4J credentials ({config.get('BloodHound', 'bh_user')}:{config.get('BloodHound', 'bh_pass')}) are not valid.")
return
Expand Down
5 changes: 1 addition & 4 deletions nxc/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ def gen_random_string(length=10):

def validate_ntlm(data):
allowed = re.compile("^[0-9a-f]{32}", re.IGNORECASE)
if allowed.match(data):
return True
else:
return False
return bool(allowed.match(data))


def called_from_cmd_args():
Expand Down
26 changes: 1 addition & 25 deletions nxc/helpers/powershell.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,31 +139,7 @@ def create_ps_command(ps_command, force_ps32=False, dont_obfs=False, custom_amsi
}catch{}
"""

if force_ps32:
command = (
amsi_bypass
+ f"""
$functions = {{
function Command-ToExecute
{{
{amsi_bypass + ps_command}
}}
}}
if ($Env:PROCESSOR_ARCHITECTURE -eq 'AMD64')
{{
$job = Start-Job -InitializationScript $functions -ScriptBlock {{Command-ToExecute}} -RunAs32
$job | Wait-Job
}}
else
{{
IEX "$functions"
Command-ToExecute
}}
"""
)

else:
command = amsi_bypass + ps_command
command = amsi_bypass + f"\n$functions = {{\n function Command-ToExecute\n {{\n{amsi_bypass + ps_command}\n }}\n}}\nif ($Env:PROCESSOR_ARCHITECTURE -eq 'AMD64')\n{{\n $job = Start-Job -InitializationScript $functions -ScriptBlock {{Command-ToExecute}} -RunAs32\n $job | Wait-Job\n}}\nelse\n{{\n IEX \"$functions\"\n Command-ToExecute\n}}\n" if force_ps32 else amsi_bypass + ps_command

nxc_logger.debug(f"Generated PS command:\n {command}\n")

Expand Down
22 changes: 9 additions & 13 deletions nxc/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,25 @@ def format(self, msg, *args, **kwargs): # noqa: A003
if self.extra is None:
return f"{msg}", kwargs

if "module_name" in self.extra.keys():
if len(self.extra["module_name"]) > 8:
self.extra["module_name"] = self.extra["module_name"][:8] + "..."
if "module_name" in self.extra and len(self.extra["module_name"]) > 8:
self.extra["module_name"] = self.extra["module_name"][:8] + "..."

# If the logger is being called when hooking the 'options' module function
if len(self.extra) == 1 and ("module_name" in self.extra.keys()):
if len(self.extra) == 1 and ("module_name" in self.extra):
return (
f"{colored(self.extra['module_name'], 'cyan', attrs=['bold']):<64} {msg}",
kwargs,
)

# If the logger is being called from nxcServer
if len(self.extra) == 2 and ("module_name" in self.extra.keys()) and ("host" in self.extra.keys()):
if len(self.extra) == 2 and ("module_name" in self.extra) and ("host" in self.extra):
return (
f"{colored(self.extra['module_name'], 'cyan', attrs=['bold']):<24} {self.extra['host']:<39} {msg}",
kwargs,
)

# If the logger is being called from a protocol
if "module_name" in self.extra.keys():
module_name = colored(self.extra["module_name"], "cyan", attrs=["bold"])
else:
module_name = colored(self.extra["protocol"], "blue", attrs=["bold"])
module_name = colored(self.extra["module_name"], "cyan", attrs=["bold"]) if "module_name" in self.extra else colored(self.extra["protocol"], "blue", attrs=["bold"])

return (
f"{module_name:<24} {self.extra['host']:<15} {self.extra['port']:<6} {self.extra['hostname'] if self.extra['hostname'] else 'NONE':<16} {msg}",
Expand All @@ -75,7 +71,7 @@ def format(self, msg, *args, **kwargs): # noqa: A003
def display(self, msg, *args, **kwargs):
"""Display text to console, formatted for nxc"""
try:
if "protocol" in self.extra.keys() and not called_from_cmd_args():
if self.extra and "protocol" in self.extra and not called_from_cmd_args():
return
except AttributeError:
pass
Expand All @@ -88,7 +84,7 @@ def display(self, msg, *args, **kwargs):
def success(self, msg, color="green", *args, **kwargs):
"""Print some sort of success to the user"""
try:
if "protocol" in self.extra.keys() and not called_from_cmd_args():
if self.extra and "protocol" in self.extra and not called_from_cmd_args():
return
except AttributeError:
pass
Expand All @@ -101,7 +97,7 @@ def success(self, msg, color="green", *args, **kwargs):
def highlight(self, msg, *args, **kwargs):
"""Prints a completely yellow highlighted message to the user"""
try:
if "protocol" in self.extra.keys() and not called_from_cmd_args():
if self.extra and "protocol" in self.extra and not called_from_cmd_args():
return
except AttributeError:
pass
Expand All @@ -114,7 +110,7 @@ def highlight(self, msg, *args, **kwargs):
def fail(self, msg, color="red", *args, **kwargs):
"""Prints a failure (may or may not be an error) - e.g. login creds didn't work"""
try:
if "protocol" in self.extra.keys() and not called_from_cmd_args():
if self.extra and "protocol" in self.extra and not called_from_cmd_args():
return
except AttributeError:
pass
Expand Down
5 changes: 1 addition & 4 deletions nxc/modules/bh_owned.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ def options(self, context, module_options):
self.neo4j_pass = module_options["PASS"]

def on_admin_login(self, context, connection):
if context.local_auth:
domain = connection.conn.getServerDNSDomainName()
else:
domain = connection.domain
domain = connection.conn.getServerDNSDomainName() if context.local_auth else connection.domain

host_fqdn = f"{connection.hostname}.{domain}".upper()
uri = f"bolt://{self.neo4j_URI}:{self.neo4j_Port}"
Expand Down
2 changes: 1 addition & 1 deletion nxc/modules/daclread.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def get_user_info(self, context, samname):
# - sid : the SID to resolve
def resolveSID(self, context, sid):
# Tries to resolve the SID from the well known SIDs
if sid in WELL_KNOWN_SIDS.keys():
if sid in WELL_KNOWN_SIDS:
return WELL_KNOWN_SIDS[sid]
# Tries to resolve the SID from the LDAP domain dump
else:
Expand Down
2 changes: 1 addition & 1 deletion nxc/modules/empire_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def options(self, context, module_options):

api_proto = "https" if "SSL" in module_options else "http"

obfuscate = True if "OBFUSCATE" in module_options else False
obfuscate = "OBFUSCATE" in module_options
# we can use commands instead of backslashes - this is because Linux and OSX treat them differently
default_obfuscation = "Token,All,1"
obfuscate_cmd = module_options["OBFUSCATE_CMD"] if "OBFUSCATE_CMD" in module_options else default_obfuscation
Expand Down
11 changes: 2 additions & 9 deletions nxc/modules/keepass_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,7 @@ def poll(self, context, connection):
connection.conn.getFile(self.share, export_path.split(":")[1], buffer.write)

# if multiple exports found, add a number at the end of local path to prevent override
if count > 0:
local_full_path = self.local_export_path + "/" + self.export_name.split(".")[0] + "_" + str(count) + "." + self.export_name.split(".")[1]
else:
local_full_path = self.local_export_path + "/" + self.export_name
local_full_path = self.local_export_path + "/" + self.export_name.split(".")[0] + "_" + str(count) + "." + self.export_name.split(".")[1] if count > 0 else self.local_export_path + "/" + self.export_name

# downloads the exported database
with open(local_full_path, "wb") as f:
Expand Down Expand Up @@ -370,11 +367,7 @@ def trigger_added(self, context, connection):
sys.exit(1)

# check if the specified KeePass configuration file does not already contain the malicious trigger
for trigger in keepass_config_xml_root.findall(".//Application/TriggerSystem/Triggers/Trigger"):
if trigger.find("Name").text == self.trigger_name:
return True

return False
return any(trigger.find("Name").text == self.trigger_name for trigger in keepass_config_xml_root.findall(".//Application/TriggerSystem/Triggers/Trigger"))

def put_file_execute_delete(self, context, connection, psh_script_str):
"""Helper to upload script to a temporary folder, run then deletes it"""
Expand Down
5 changes: 1 addition & 4 deletions nxc/modules/laps.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ def options(self, context, module_options):

def on_login(self, context, connection):
context.log.display("Getting LAPS Passwords")
if self.computer is not None:
searchFilter = "(&(objectCategory=computer)(|(msLAPS-EncryptedPassword=*)(ms-MCS-AdmPwd=*)(msLAPS-Password=*))(name=" + self.computer + "))"
else:
searchFilter = "(&(objectCategory=computer)(|(msLAPS-EncryptedPassword=*)(ms-MCS-AdmPwd=*)(msLAPS-Password=*)))"
searchFilter = "(&(objectCategory=computer)(|(msLAPS-EncryptedPassword=*)(ms-MCS-AdmPwd=*)(msLAPS-Password=*))(name=" + self.computer + "))" if self.computer is not None else "(&(objectCategory=computer)(|(msLAPS-EncryptedPassword=*)(ms-MCS-AdmPwd=*)(msLAPS-Password=*)))"
attributes = [
"msLAPS-EncryptedPassword",
"msLAPS-Password",
Expand Down
2 changes: 1 addition & 1 deletion nxc/modules/ldap-checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def run_ldap(target, credential):
_, err = await ldapsClientConn.bind()
if "stronger" in str(err):
return True # because LDAP server signing requirements ARE enforced
elif ("data 52e" or "data 532") in str(err):
elif ("data 52e") in str(err):
context.log.fail("Not connected... exiting")
exit()
elif err is None:
Expand Down
5 changes: 1 addition & 4 deletions nxc/modules/ms17-010.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,7 @@ def check(ip, port=445):
nt_status = struct.pack("BBH", smb.error_class, smb.reserved1, smb.error_code)

# Check the NT status to determine if the vulnerability exists
if nt_status == "\x05\x02\x00\xc0":
return True
else:
return False
return nt_status == "\x05\x02\x00À"

except Exception:
return False
Expand Down
2 changes: 1 addition & 1 deletion nxc/modules/nanodump.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def on_admin_login(self, context, connection):
# apparently SMB exec methods treat the output parameter differently than MSSQL (we use it to display())
# if we don't do this, then SMB doesn't actually return the results of commands, so it appears that the
# execution fails, which it doesn't
display_output = True if self.context.protocol == "smb" else False
display_output = self.context.protocol == "smb"
self.context.log.debug(f"Display Output: {display_output}")
# get LSASS PID via `tasklist`
command = 'tasklist /v /fo csv | findstr /i "lsass"'
Expand Down
2 changes: 1 addition & 1 deletion nxc/modules/pso.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def options(self, context, module_options):
def convert_time_field(self, field, value):
time_fields = {"msDS-LockoutObservationWindow": (60, "mins"), "msDS-MinimumPasswordAge": (86400, "days"), "msDS-MaximumPasswordAge": (86400, "days"), "msDS-LockoutDuration": (60, "mins")}

if field in time_fields.keys():
if field in time_fields:
value = f"{int(fabs(float(value)) / (10000000 * time_fields[field][0]))} {time_fields[field][1]}"

return value
Expand Down
9 changes: 3 additions & 6 deletions nxc/modules/rdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from impacket.dcerpc.v5.dcom import wmi
from impacket.dcerpc.v5.dtypes import NULL
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_PRIVACY
import contextlib


class NXCModule:
Expand Down Expand Up @@ -148,10 +149,8 @@ def rdp_wrapper(self, action):

if action == "enable":
self.query_rdp_port(remote_ops, reg_handle)
try:
with contextlib.suppress(Exception):
remote_ops.finish()
except Exception:
pass

def rdp_ram_wrapper(self, action):
remote_ops = RemoteOperations(self.__smbconnection, False)
Expand Down Expand Up @@ -183,10 +182,8 @@ def rdp_ram_wrapper(self, action):
elif int(data) == 1:
self.logger.success("Disable RDP Restricted Admin Mode via SMB(ncacn_np) succeed")

try:
with contextlib.suppress(Exception):
remote_ops.finish()
except Exception:
pass

def query_rdp_port(self, remoteOps, regHandle):
if remoteOps:
Expand Down
Loading

0 comments on commit d8b2931

Please sign in to comment.