Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module wcc added some defender checks #306

Merged
merged 10 commits into from
Sep 1, 2024
107 changes: 99 additions & 8 deletions nxc/modules/wcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from nxc.logger import nxc_logger
from impacket.dcerpc.v5 import rrp, samr, scmr
from impacket.dcerpc.v5.rrp import DCERPCSessionError
from impacket.dcerpc.v5.rpcrt import DCERPCException
from impacket.smbconnection import SessionError as SMBSessionError
from impacket.examples.secretsdump import RemoteOperations

Expand Down Expand Up @@ -88,11 +89,11 @@ class NXCModule:
supported_protocols = ["smb"]
opsec_safe = True
multiple_hosts = True

def __init__(self):
self.context = None
self.module_options = None

self.wcc_logger = logging.getLogger("WCC")
self.wcc_logger.propagate = False
log_filename = nxc_logger.init_log_file()
Expand Down Expand Up @@ -160,7 +161,7 @@ def __init__(self, context, connection):
def run(self):
self.init_checks()
self.check_config()

def init_checks(self):
# Declare the checks to do and how to do them
self.checks = [
Expand Down Expand Up @@ -190,7 +191,15 @@ def init_checks(self):
ConfigCheck("BitLocker configuration", "Checks the BitLocker configuration (based on https://www.stigviewer.com/stig/windows_10/2020-06-15/finding/V-94859)", checker_args=[[self, ("HKLM\\SOFTWARE\\Policies\\Microsoft\\FVE", "UseAdvancedStartup", 1), ("HKLM\\SOFTWARE\\Policies\\Microsoft\\FVE", "UseTPMPIN", 1)]]),
ConfigCheck("Guest account disabled", "Checks if the guest account is disabled", checkers=[self.check_guest_account_disabled]),
ConfigCheck("Automatic session lock enabled", "Checks if the session is automatically locked on after a period of inactivity", checker_args=[[self, ("HKCU\\Control Panel\\Desktop", "ScreenSaverIsSecure", 1), ("HKCU\\Control Panel\\Desktop", "ScreenSaveTimeOut", 300, le)]]),
ConfigCheck('Powershell Execution Policy == "Restricted"', 'Checks if the Powershell execution policy is set to "Restricted"', checker_args=[[self, ("HKLM\\SOFTWARE\\Microsoft\\PowerShell\\1\\ShellIds\\Microsoft.Powershell", "ExecutionPolicy", "Restricted\x00"), ("HKCU\\SOFTWARE\\Microsoft\\PowerShell\\1\\ShellIds\\Microsoft.Powershell", "ExecutionPolicy", "Restricted\x00")]], checker_kwargs=[{"options": {"KOIfMissing": False, "lastWins": True}}])
ConfigCheck('Powershell Execution Policy == "Restricted"', 'Checks if the Powershell execution policy is set to "Restricted"', checker_args=[[self, ("HKLM\\SOFTWARE\\Microsoft\\PowerShell\\1\\ShellIds\\Microsoft.Powershell", "ExecutionPolicy", "Restricted\x00"), ("HKCU\\SOFTWARE\\Microsoft\\PowerShell\\1\\ShellIds\\Microsoft.Powershell", "ExecutionPolicy", "Restricted\x00")]], checker_kwargs=[{"options": {"KOIfMissing": False, "lastWins": True}}]),
ConfigCheck("Defender service running", "Checks if defender service is enabled", checkers=[self.check_defender_service]),
ConfigCheck("Defender Tamper Protection enabled", "Check if Defender Tamper Protection is enabled", checker_args=[[self, ("HKLM\\Software\\Microsoft\\Windows Defender\\Features", "TamperProtection", 5)]]),
ConfigCheck("Defender RealTime Monitoring enabled", "Check if Defender RealTime Monitoring is enabled", checker_args=[[self, ("HKLM\\Software\\Policies\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableRealtimeMonitoring", 0), ("HKLM\\Software\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableRealtimeMonitoring", 0)]], checker_kwargs=[{"options": {"lastWins": True, "stopOnOK": True}}]),
ConfigCheck("Defender IOAV Protection enabled", "Check if Defender IOAV Protection is enabled", checker_args=[[self, ("HKLM\\Software\\Policies\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableIOAVProtection", 0), ("HKLM\\Software\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableIOAVProtection", 0)]], checker_kwargs=[{"options": {"lastWins": True, "stopOnOK": True}}]),
ConfigCheck("Defender Behaviour Monitoring enabled", "Check if Defender Behaviour Monitoring is enabled", checker_args=[[self, ("HKLM\\Software\\Policies\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableBehaviourMonitoring", 0), ("HKLM\\Software\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableBehaviourMonitoring", 0)]], checker_kwargs=[{"options": {"lastWins": True, "stopOnOK": True}}]),
ConfigCheck("Defender Script Scanning enabled", "Check if Defender Script Scanning is enabled", checker_args=[[self, ("HKLM\\Software\\Policies\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableScriptScanning", 0), ("HKLM\\Software\\Microsoft\\Windows Defender\\Real-Time Protection", "DisableScriptScanning", 0)]], checker_kwargs=[{"options": {"lastWins": True, "stopOnOK": True}}]),
ConfigCheck("Defender no path exlusions", "Checks Defender path exlusion", checkers=[self.check_defender_exclusion], checker_args=[("HKLM\\SOFTWARE\\Policies\\Microsoft\\Windows Defender\\Exclusions\\Paths", "HKLM\\SOFTWARE\\Microsoft\\Windows Defender\\Exclusions\\Paths")]),
ConfigCheck("Defender no extension exclusions", "Checks Defender extension exlusion", checkers=[self.check_defender_exclusion], checker_args=[("HKLM\\SOFTWARE\\Policies\\Microsoft\\Windows Defender\\Exclusions\\Extensions", "HKLM\\SOFTWARE\\Microsoft\\Windows Defender\\Exclusions\\Extensions")])
]

# Add check to conf_checks table if missing
Expand Down Expand Up @@ -243,7 +252,7 @@ def check_config(self):
if host_id is not None:
self.connection.db.add_check_result(host_id, check.check_id, check.ok, ", ".join(check.reasons).replace("\x00", ""))

def check_registry(self, *specs, options=None):
def check_registry(self, *specs, options=None, stop_on_error=False):
"""
Perform checks that only require to compare values in the registry with expected values, according to the specs
a spec may be either a 3-tuple: (key name, value name, expected value), or a 4-tuple (key name, value name, expected value, operation), where operation is a function that implements a comparison operator
Expand All @@ -261,14 +270,15 @@ def check_registry(self, *specs, options=None):
try:
if len(spec) == 3:
(key, value_name, expected_value) = spec
op = operator.eq
elif len(spec) == 4:
(key, value_name, expected_value, op) = spec
else:
ok = False
reasons = ["Check could not be performed (invalid specification provided)"]
return ok, reasons
except Exception as e:
self.module.log.error(f"Check could not be performed. Details: specs={specs}, dce={self.dce}, error: {e}")
self.context.log.error(f"Check could not be performed. Details: specs={specs}, dce={self.dce}, error: {e}")
return ok, reasons

if op == operator.eq:
Expand Down Expand Up @@ -308,6 +318,9 @@ def check_registry(self, *specs, options=None):
else:
ok = False
reasons.append(f"Error while retrieving value of {key}\\{value_name}: {value}")
if stop_on_error:
ok = None
return ok, reasons
continue

if op(value, expected_value):
Expand Down Expand Up @@ -479,6 +492,78 @@ def check_applocker(self):

return success, reasons

def get_exclusions(self, key_name):
exclusions = []
try:
values = self.reg_query_value(self.dce, self.connection, key_name, valueName=None, all_value=True)
for _, value_name, _ in values:
exclusions.append(value_name)
except Exception:
self.context.log.debug("No defender exclusion policies")

return len(exclusions), exclusions

def check_defender_exclusion(self, *spec, options=None):
try:
if len(spec) == 2:
(policy_key_name, key_name) = spec
else:
ok = False
reasons = ["Check could not be performed (invalid specification provided)"]
return ok, reasons
except Exception as e:
self.context.log.error(f"Check could not be performed. Details: spec={spec}, dce={self.dce}, error: {e}")
return ok, reasons

reasons = []
success = True

count, exclusions_p = self.get_exclusions(policy_key_name)
reasons = [f"Policy: [{', '.join(exclusions_p)}]"]
count_k, exclusions_k = self.get_exclusions(key_name)
reasons.append(f"Manual: [{', '.join(exclusions_k)}]")
count += count_k

if count > 0:
success = False

return success, reasons

def check_defender_service(self):
ok = True
raised = False
reasons = []
try:
service_config, service_status = self.get_service("windefend", self.connection)
if service_status == scmr.SERVICE_RUNNING:
reasons.append("windefend service running")
elif service_status == scmr.SERVICE_STOPPED:
ok = False
reasons.append("windefend service not running")
except DCERPCException as e:
ok = True
raised = True
reasons = [f"windefend service check error({e})"]
if ok is False or raised is True:
try:
service_config, service_status = self.get_service("sense", self.connection)
if service_status == scmr.SERVICE_RUNNING:
reasons.append("sense service running")
elif service_status == scmr.SERVICE_STOPPED:
ok = False
reasons.append("sense service not running")
except DCERPCException as e:
ok = True
raised = True
reasons.append(f"sense service check error({e})")
if raised is True:
reasons_save = reasons
args = ("HKLM\\SOFTWARE\\Microsoft\\Windows Defender", "IsServiceRunning", 1)
ok, reasons = self.check_registry(args)
reasons.extend(reasons_save)

return ok, reasons

def _open_root_key(self, dce, connection, root_key):
ans = None
retries = 1
Expand Down Expand Up @@ -527,7 +612,7 @@ def reg_get_subkeys(self, dce, connection, key_name):
break
return subkeys

def reg_query_value(self, dce, connection, keyName, valueName=None):
def reg_query_value(self, dce, connection, keyName, valueName=None, all_value=False):
"""Query remote registry data for a given registry value"""

def subkey_values(subkey_handle):
Expand Down Expand Up @@ -580,8 +665,10 @@ def get_value(subkey_handle, dwIndex=0):

subkey_handle = ans["phkResult"]

if valueName is None:
if valueName is None and all_value is False:
return get_value(subkey_handle)[2]
elif valueName is None and all_value is True:
return subkey_values(subkey_handle)
else:
for _, name, data in subkey_values(subkey_handle):
if name.upper() == valueName.upper():
Expand Down Expand Up @@ -635,15 +722,19 @@ def ls(self, smb, path="\\", share="C$"):
self.context.log.error(f"ls(): C:\\{path} {e}\n")
return file_listing


def le(reg_sz_string, number):
return int(reg_sz_string[:-1]) <= number


def in_(obj, seq):
return obj in seq


def startswith(string, start):
return string.startswith(start)


def not_(boolean_operator):
def wrapper(*args, **kwargs):
return not boolean_operator(*args, **kwargs)
Expand Down
Loading