diff --git a/README.md b/README.md index 5bddecd..9f8e191 100644 --- a/README.md +++ b/README.md @@ -181,7 +181,7 @@ Linux: `PATH="$(pwd):$PATH" protol --in-place --python-out ../src/wa_crypt_tools/proto protoc --proto-path=. *.proto` -Now all of the generated python classes should have their imports fixed. +Now all the generated python classes should have their imports fixed. --- diff --git a/src/wa_crypt_tools/lib/db/db.py b/src/wa_crypt_tools/lib/db/db.py index fbc9f0f..2e9721e 100644 --- a/src/wa_crypt_tools/lib/db/db.py +++ b/src/wa_crypt_tools/lib/db/db.py @@ -4,7 +4,8 @@ from wa_crypt_tools.lib.key.key import Key from wa_crypt_tools.lib.props import Props -l = logging.getLogger(__name__) +log = logging.getLogger(__name__) + class Database(abc.ABC): """ diff --git a/src/wa_crypt_tools/lib/db/db12.py b/src/wa_crypt_tools/lib/db/db12.py index 9cd8fb4..beab24c 100644 --- a/src/wa_crypt_tools/lib/db/db12.py +++ b/src/wa_crypt_tools/lib/db/db12.py @@ -12,7 +12,9 @@ from wa_crypt_tools.lib.key.key14 import Key14 from wa_crypt_tools.lib.props import Props -l = logging.getLogger(__name__) +log = logging.getLogger(__name__) + + class Database12(Database): """ Implementation of a crypt12 database. @@ -40,25 +42,25 @@ def __init__(self, key: Key14 = None, encrypted=None, if encrypted and key: self.cipher_version = encrypted.read(2) if self.cipher_version != key.get_cipher_version(): - l.error("Cipher version mismatch: {} != {}".format(self.cipher_version, key.get_cipher_version())) + log.error("Cipher version mismatch: {} != {}".format(self.cipher_version, key.get_cipher_version())) raise ValueError self.file_hash.update(self.cipher_version) self.key_version = encrypted.read(1) if self.key_version != key.get_key_version(): - l.error("Key version mismatch: {} != {}".format(self.key_version, key.get_key_version())) + log.error("Key version mismatch: {} != {}".format(self.key_version, key.get_key_version())) raise ValueError self.file_hash.update(self.key_version) self.serversalt = encrypted.read(32) if self.serversalt != key.get_serversalt(): - l.error("Server salt mismatch: {} != {}".format(self.serversalt, key.get_serversalt())) + log.error("Server salt mismatch: {} != {}".format(self.serversalt, key.get_serversalt())) raise ValueError self.file_hash.update(self.serversalt) self.googleid = encrypted.read(16) if self.googleid != key.get_googleid(): - l.error("Google ID mismatch: {} != {}".format(self.googleid, key.get_googleid())) + log.error("Google ID mismatch: {} != {}".format(self.googleid, key.get_googleid())) self.file_hash.update(self.googleid) self.iv = encrypted.read(16) @@ -106,7 +108,7 @@ def __init__(self, key: Key14 = None, encrypted=None, self.cipher_version = cipher_version self.file_hash.update(self.cipher_version) else: - l.error("Unsupported cipher version provided!") + log.error("Unsupported cipher version provided!") raise ValueError else: self.cipher_version = C.SUPPORTED_CIPHER_VERSION @@ -117,7 +119,7 @@ def __init__(self, key: Key14 = None, encrypted=None, self.key_version = key_version self.file_hash.update(self.key_version) else: - l.error("Unsupported key version provided!") + log.error("Unsupported key version provided!") else: self.key_version = C.SUPPORTED_KEY_VERSIONS[-1] self.file_hash.update(self.key_version) @@ -154,9 +156,9 @@ def decrypt(self, key: Key14, encrypted: bytes) -> bytes: crypt12_footer = str(userjid) jid = findall(r"(?:-|\d)(?:-|\d)(\d\d)", crypt12_footer) if len(jid) != 1: - l.error("The phone number end is not 2 characters long") + log.error("The phone number end is not 2 characters long") else: - l.debug("Your phone number ends with {}".format(jid[0])) + log.debug("Your phone number ends with {}".format(jid[0])) checksum = encrypted[-20:-4] authentication_tag = encrypted[-36:-20] encrypted_data = encrypted[:-36] @@ -170,14 +172,14 @@ def decrypt(self, key: Key14, encrypted: bytes) -> bytes: # TODO do crypt12 multifiles actually exist? is_multifile_backup = True else: - l.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest())) + log.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest())) cipher = AES.new(key.get(), AES.MODE_GCM, self.iv) try: output_decrypted: bytes = cipher.decrypt(encrypted_data) except ValueError as e: - l.fatal("Decryption failed: {}." - "\n This probably means your backup is corrupted.".format(e)) + log.fatal("Decryption failed: {}." + "\n This probably means your backup is corrupted.".format(e)) raise e # Verify the authentication tag @@ -193,12 +195,13 @@ def decrypt(self, key: Key14, encrypted: bytes) -> bytes: else: cipher.verify(authentication_tag) except ValueError as e: - l.error("Authentication tag mismatch: {}." - "\n This probably means your backup is corrupted.".format(e)) + log.error("Authentication tag mismatch: {}." + "\n This probably means your backup is corrupted.".format(e)) return output_decrypted + def encrypt(self, key: Key14, props: Props, decrypted: bytes) -> bytes: - hash = md5() + file_hash = md5() out = b"" out += self.cipher_version out += self.key_version @@ -209,16 +212,13 @@ def encrypt(self, key: Key14, props: Props, decrypted: bytes) -> bytes: encrypted = cipher.encrypt(decrypted) out += encrypted out += cipher.digest() - hash.update(out) - out += hash.digest() + file_hash.update(out) + out += file_hash.digest() jid = props.get_jid() if len(jid) != 2: - l.error("The phone number end is not 2 characters long") + log.error("The phone number end is not 2 characters long") out += "--{}".format(jid).encode() return out - - def get_iv(self) -> bytes: return self.iv - diff --git a/src/wa_crypt_tools/lib/db/db15.py b/src/wa_crypt_tools/lib/db/db15.py index 70d4767..4e4db6d 100644 --- a/src/wa_crypt_tools/lib/db/db15.py +++ b/src/wa_crypt_tools/lib/db/db15.py @@ -8,11 +8,12 @@ from wa_crypt_tools.lib.props import Props -l = logging.getLogger(__name__) +log = logging.getLogger(__name__) from wa_crypt_tools.lib.db.db import Database from wa_crypt_tools.lib.key.key15 import Key15 + class Database15(Database): def __str__(self): return "Database15" @@ -28,23 +29,23 @@ def __init__(self, *, key: Key15 = None, encrypted=None, iv: bytes = None from wa_crypt_tools.proto import backup_prefix_pb2 as prefix from wa_crypt_tools.proto import key_type_pb2 as key_type except ImportError as e: - l.error("Could not import the proto classes: {}".format(e)) + log.error("Could not import the proto classes: {}".format(e)) if str(e).startswith("cannot import name 'builder' from 'google.protobuf.internal'"): - l.error("You need to upgrade the protobuf library to at least 3.20.0.\n" - " python -m pip install --upgrade protobuf") + log.error("You need to upgrade the protobuf library to at least 3.20.0.\n" + " python -m pip install --upgrade protobuf") elif str(e).startswith("no module named"): - l.error("Please download them and put them in the \"proto\" sub folder.") + log.error("Please download them and put them in the \"proto\" sub folder.") raise e except AttributeError as e: - l.error("Could not import the proto classes: {}\n ".format(e) + - "Your protobuf library is probably too old.\n " - "Please upgrade to at least version 3.20.0 , by running:\n " - "python -m pip install --upgrade protobuf") + log.error("Could not import the proto classes: {}\n ".format(e) + + "Your protobuf library is probably too old.\n " + "Please upgrade to at least version 3.20.0 , by running:\n " + "python -m pip install --upgrade protobuf") raise e self.header = prefix.BackupPrefix() - l.debug("Parsing database header...") + log.debug("Parsing database header...") try: @@ -62,7 +63,7 @@ def __init__(self, *, key: Key15 = None, encrypted=None, iv: bytes = None else: self.file_hash.update(encrypted.read(1)) if not msgstore_features_flag: - l.debug("No feature table found (not a msgstore DB or very old)") + log.debug("No feature table found (not a msgstore DB or very old)") try: @@ -70,47 +71,46 @@ def __init__(self, *, key: Key15 = None, encrypted=None, iv: bytes = None self.file_hash.update(protobuf_raw) if self.header.ParseFromString(protobuf_raw) != protobuf_size: - l.error("Protobuf message not fully read. Please report a bug.") + log.error("Protobuf message not fully read. Please report a bug.") else: # Checking and printing WA version and phone number version = findall(r"\d(?:\.\d{1,3}){3}", self.header.info.app_version) if len(version) != 1: - l.error('WhatsApp version not found') + log.error('WhatsApp version not found') else: - l.debug("WhatsApp version: {}".format(version[0])) + log.debug("WhatsApp version: {}".format(version[0])) if len(self.header.info.jidSuffix) != 2: - l.error("The phone number end is not 2 characters long") - l.debug("Your phone number ends with {}".format(self.header.info.jidSuffix)) + log.error("The phone number end is not 2 characters long") + log.debug("Your phone number ends with {}".format(self.header.info.jidSuffix)) if len(self.header.c15_iv.IV) != 0: # DB Header is crypt15 # if type(key) is not Key15: # l.error("You are using a crypt14 key file with a crypt15 backup.") if len(self.header.c15_iv.IV) != 16: - l.error("IV is not 16 bytes long but is {} bytes long".format(len(self.header.c15_iv.IV))) + log.error( + "IV is not 16 bytes long but is {} bytes long".format(len(self.header.c15_iv.IV))) iv = self.header.c15_iv.IV elif len(self.header.c14_cipher.IV) != 0: raise ValueError("Crypt14 file in crypt15 constructor!") else: - l.error("Could not parse the IV from the protobuf message. Please report a bug.") + log.error("Could not parse the IV from the protobuf message. Please report a bug.") raise ValueError - - except DecodeError as e: - l.error("Could not parse the protobuf message: {}".format(e)) + log.error("Could not parse the protobuf message: {}".format(e)) raise e except OSError as e: - l.fatal("Reading database header failed: {}".format(e)) + log.fatal("Reading database header failed: {}".format(e)) raise e else: if iv: if len(iv) != 16: - l.error("IV is not 16 bytes long but is {} bytes long".format(len(iv))) + log.error("IV is not 16 bytes long but is {} bytes long".format(len(iv))) self.iv = iv else: self.iv = urandom(16) @@ -129,14 +129,14 @@ def decrypt(self, key: Key15, encrypted: bytes) -> bytes: # We are probably in a multifile backup, which does not have a checksum. is_multifile_backup = True else: - l.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest())) + log.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest())) cipher = AES.new(key.get(), AES.MODE_GCM, self.iv) try: output_decrypted: bytes = cipher.decrypt(encrypted_data) except ValueError as e: - l.fatal("Decryption failed: {}." - "\n This probably means your backup is corrupted.".format(e)) + log.fatal("Decryption failed: {}." + "\n This probably means your backup is corrupted.".format(e)) raise e # Verify the authentication tag @@ -152,8 +152,8 @@ def decrypt(self, key: Key15, encrypted: bytes) -> bytes: else: cipher.verify(authentication_tag) except ValueError as e: - l.error("Authentication tag mismatch: {}." - "\n This probably means your backup is corrupted.".format(e)) + log.error("Authentication tag mismatch: {}." + "\n This probably means your backup is corrupted.".format(e)) return output_decrypted @@ -186,5 +186,6 @@ def encrypt(self, key: Key15, props: Props, decrypted: bytes) -> bytes: file_hash.update(authentication_tag) out += file_hash.digest() return out + def get_iv(self) -> bytes: - return self.iv \ No newline at end of file + return self.iv diff --git a/src/wa_crypt_tools/lib/db/dbfactory.py b/src/wa_crypt_tools/lib/db/dbfactory.py index d8aa9f4..0a3b56a 100644 --- a/src/wa_crypt_tools/lib/db/dbfactory.py +++ b/src/wa_crypt_tools/lib/db/dbfactory.py @@ -9,7 +9,7 @@ from wa_crypt_tools.lib.props import Props from wa_crypt_tools.lib.utils import header_info -l = logging.getLogger(__name__) +log = logging.getLogger(__name__) from hashlib import md5 from re import findall @@ -22,23 +22,23 @@ def from_file(encrypted): from wa_crypt_tools.proto import backup_prefix_pb2 as prefix from wa_crypt_tools.proto import key_type_pb2 as key_type except ImportError as e: - l.error("Could not import the proto classes: {}".format(e)) + log.error("Could not import the proto classes: {}".format(e)) if str(e).startswith("cannot import name 'builder' from 'google.protobuf.internal'"): - l.error("You need to upgrade the protobuf library to at least 3.20.0.\n" - " python -m pip install --upgrade protobuf") + log.error("You need to upgrade the protobuf library to at least 3.20.0.\n" + " python -m pip install --upgrade protobuf") elif str(e).startswith("no module named"): - l.error("Please download them and put them in the \"proto\" sub folder.") + log.error("Please download them and put them in the \"proto\" sub folder.") raise e except AttributeError as e: - l.error("Could not import the proto classes: {}\n ".format(e) + - "Your protobuf library is probably too old.\n " - "Please upgrade to at least version 3.20.0 , by running:\n " - "python -m pip install --upgrade protobuf") + log.error("Could not import the proto classes: {}\n ".format(e) + + "Your protobuf library is probably too old.\n " + "Please upgrade to at least version 3.20.0 , by running:\n " + "python -m pip install --upgrade protobuf") raise e header = prefix.BackupPrefix() - l.debug("Parsing database header...") + log.debug("Parsing database header...") try: file_hash = md5() @@ -56,7 +56,7 @@ def from_file(encrypted): else: file_hash.update(encrypted.read(1)) if not msgstore_features_flag: - l.debug("No feature table found (not a msgstore DB or very old)") + log.debug("No feature table found (not a msgstore DB or very old)") try: @@ -64,25 +64,25 @@ def from_file(encrypted): file_hash.update(protobuf_raw) if header.ParseFromString(protobuf_raw) != protobuf_size: - l.error("Protobuf message not fully read. Please report a bug.") + log.error("Protobuf message not fully read. Please report a bug.") else: # Checking and printing WA version and phone number version = findall(r"\d(?:\.\d{1,3}){3}", header.info.app_version) if len(version) != 1: - l.error('WhatsApp version not found') + log.error('WhatsApp version not found') else: - l.debug("WhatsApp version: {}".format(version[0])) + log.debug("WhatsApp version: {}".format(version[0])) if len(header.info.jidSuffix) != 2: - l.error("The phone number end is not 2 characters long") - l.debug("Your phone number ends with {}".format(header.info.jidSuffix)) + log.error("The phone number end is not 2 characters long") + log.debug("Your phone number ends with {}".format(header.info.jidSuffix)) if len(header.c15_iv.IV) != 0: # DB Header is crypt15 # if type(key) is not Key15: # l.error("You are using a crypt14 key file with a crypt15 backup.") if len(header.c15_iv.IV) != 16: - l.error("IV is not 16 bytes long but is {} bytes long".format(len(header.c15_iv.IV))) + log.error("IV is not 16 bytes long but is {} bytes long".format(len(header.c15_iv.IV))) iv = header.c15_iv.IV elif len(header.c14_cipher.IV) != 0: @@ -95,38 +95,31 @@ def from_file(encrypted): # l.error("Cipher version mismatch: {} != {}" # .format(key.cipher_version, p.c14_cipher.cipher_version)) - # Fix bytes to string encoding - # key.key_version = (key.key_version[0] + 48).to_bytes(1, byteorder='big') - # if key.key_version != p.c14_cipher.key_version: - # if key.key_version > p.c14_cipher.key_version: - # l.error("Key version mismatch: {} != {} .\n " - # .format(key.key_version, p.c14_cipher.key_version) + - # "Your backup is too old for this key file.\n " + - # "Please try using a newer backup.") - # elif key.key_version < p.c14_cipher.key_version: - # l.error("Key version mismatch: {} != {} .\n " - # .format(key.key_version, p.c14_cipher.key_version) + - # "Your backup is too new for this key file.\n " + - # "Please try using an older backup, or getting the new key.") - # else: - # l.error("Key version mismatch: {} != {} (?)" - # .format(key.key_version, p.c14_cipher.key_version)) - # if key.get_serversalt() != p.c14_cipher.server_salt: - # l.error("Server salt mismatch: {} != {}".format(key.get_serversalt(), p.c14_cipher.server_salt)) - # if key.get_googleid() != p.c14_cipher.google_id: - # l.error("Google ID mismatch: {} != {}".format(key.get_googleid(), p.c14_cipher.google_id)) + # Fix bytes to string encoding key.key_version = (key.key_version[0] + 48).to_bytes(1, + # byteorder='big') if key.key_version != p.c14_cipher.key_version: if key.key_version > + # p.c14_cipher.key_version: l.error("Key version mismatch: {} != {} .\n " .format( + # key.key_version, p.c14_cipher.key_version) + "Your backup is too old for this key file.\n + # " + "Please try using a newer backup.") elif key.key_version < p.c14_cipher.key_version: + # l.error("Key version mismatch: {} != {} .\n " .format(key.key_version, + # p.c14_cipher.key_version) + "Your backup is too new for this key file.\n " + "Please try + # using an older backup, or getting the new key.") else: l.error("Key version mismatch: {} != + # {} (?)" .format(key.key_version, p.c14_cipher.key_version)) if key.get_serversalt() != + # p.c14_cipher.server_salt: l.error("Server salt mismatch: {} != {}".format( + # key.get_serversalt(), p.c14_cipher.server_salt)) if key.get_googleid() != + # p.c14_cipher.google_id: l.error("Google ID mismatch: {} != {}".format(key.get_googleid(), + # p.c14_cipher.google_id)) if len(header.c14_cipher.IV) != 16: - l.error( + log.error( "IV is not 16 bytes long but is {} bytes long".format(len(header.c14_cipher.IV))) iv = header.c14_cipher.IV else: - l.error("Could not parse the IV from the protobuf message. Please report a bug.") + log.error("Could not parse the IV from the protobuf message. Please report a bug.") raise DecodeError # We are done here - l.debug(header_info(header)) - + log.debug(header_info(header)) + props = Props(v_features=header.info) if header.c15_iv.IV: db = Database15(iv=iv, props=props) @@ -137,20 +130,19 @@ def from_file(encrypted): db.file_hash = file_hash return db else: - l.error("Could not parse the IV from the protobuf message. Please report a bug.") + log.error("Could not parse the IV from the protobuf message. Please report a bug.") raise DecodeError except DecodeError: # try again as a crypt12 - l.debug("Could not parse the protobuf message as a crypt14/15. Trying as a crypt12...") + log.debug("Could not parse the protobuf message as a crypt14/15. Trying as a crypt12...") try: encrypted.seek(0) except OSError as e: - l.fatal("Could not reset the file pointer: {}".format(e)) + log.fatal("Could not reset the file pointer: {}".format(e)) raise e return Database12(encrypted=encrypted) - except OSError as e: - l.fatal("Reading database header failed: {}".format(e)) + log.fatal("Reading database header failed: {}".format(e)) diff --git a/src/wa_crypt_tools/lib/key/key14.py b/src/wa_crypt_tools/lib/key/key14.py index fcc7ea0..6cf5c6f 100644 --- a/src/wa_crypt_tools/lib/key/key14.py +++ b/src/wa_crypt_tools/lib/key/key14.py @@ -8,9 +8,10 @@ from wa_crypt_tools.lib.utils import create_jba import logging -l = logging.getLogger(__name__) -# Thanks to python, these all have to go to the same file +log = logging.getLogger(__name__) + + class Key14(Key): # These constants are only used with crypt12/14 keys. __SUPPORTED_CIPHER_VERSION = b'\x00\x01' @@ -18,7 +19,7 @@ class Key14(Key): def __init__(self, keyarray: bytes = None, cipher_version: bytes = None, key_version: bytes = None, - serversalt: bytes = None, googleid: bytes = None, hashedgoogleid: bytes = None, + serversalt: bytes = None, googleid: bytes = None, hashedgoogleid: bytes = None, iv: bytes = None, key: bytes = None): """Extracts the fields from a crypt14 loaded key file.""" # key file format and encoding explanation: @@ -42,53 +43,53 @@ def __init__(self, keyarray: bytes = None, self.__cipher_version = self.__SUPPORTED_CIPHER_VERSION else: if cipher_version != self.__SUPPORTED_CIPHER_VERSION: - l.error("Invalid cipher version: {}".format(cipher_version.hex())) + log.error("Invalid cipher version: {}".format(cipher_version.hex())) self.__cipher_version = cipher_version if key_version is None: self.__key_version = self.__SUPPORTED_KEY_VERSIONS[-1] else: if key_version not in self.__SUPPORTED_KEY_VERSIONS: - l.error("Invalid key version: {}".format(key_version.hex())) + log.error("Invalid key version: {}".format(key_version.hex())) self.__key_version = key_version if serversalt is None: self.__serversalt = urandom(32) else: if len(serversalt) != 32: - l.error("Invalid server salt length: {}".format(serversalt.hex())) + log.error("Invalid server salt length: {}".format(serversalt.hex())) self.__serversalt = serversalt if googleid is None: self.__googleid = urandom(16) else: if len(googleid) != 16: - l.error("Invalid google id length: {}".format(googleid.hex())) + log.error("Invalid google id length: {}".format(googleid.hex())) self.__googleid = googleid if hashedgoogleid is None: self.__hashedgoogleid = sha256(self.__googleid).digest() else: - l.warning("Using supplied hashed google id") + log.warning("Using supplied hashed google id") if len(hashedgoogleid) != 32: - l.error("Invalid hashed google id length: {}".format(hashedgoogleid.hex())) + log.error("Invalid hashed google id length: {}".format(hashedgoogleid.hex())) self.__hashedgoogleid = hashedgoogleid if iv is None: self.__padding = b'\x00' * 16 else: if len(iv) != 16: - l.error("Invalid IV length: {}".format(iv.hex())) + log.error("Invalid IV length: {}".format(iv.hex())) if iv != b'\x00' * 16: - l.warning("IV should be empty") + log.warning("IV should be empty") self.__padding = iv if key is None: self.__key = urandom(32) else: if len(key) != 32: - l.error("Invalid key length: {}".format(key.hex())) + log.error("Invalid key length: {}".format(key.hex())) self.__key = key return # Check if the keyfile has a supported cipher version self.__cipher_version = keyarray[:len(self.__SUPPORTED_CIPHER_VERSION)] if self.__SUPPORTED_CIPHER_VERSION != self.__cipher_version: - l.error("Invalid keyfile: Unsupported cipher version {}" - .format(keyarray[:len(self.__SUPPORTED_CIPHER_VERSION)].hex())) + log.error("Invalid keyfile: Unsupported cipher version {}" + .format(keyarray[:len(self.__SUPPORTED_CIPHER_VERSION)].hex())) index = len(self.__SUPPORTED_CIPHER_VERSION) # Check if the keyfile has a supported key version @@ -99,8 +100,8 @@ def __init__(self, keyarray: bytes = None, self.__key_version = v break if not version_supported: - l.error('Invalid keyfile: Unsupported key version {}' - .format(keyarray[index:index + len(self.__SUPPORTED_KEY_VERSIONS[0])].hex())) + log.error('Invalid keyfile: Unsupported key version {}' + .format(keyarray[index:index + len(self.__SUPPORTED_KEY_VERSIONS[0])].hex())) self.__serversalt = keyarray[3:35] @@ -109,8 +110,8 @@ def __init__(self, keyarray: bytes = None, expected_digest = sha256(self.__googleid).digest() actual_digest = keyarray[51:83] if expected_digest != actual_digest: - l.error("Invalid keyfile: Invalid SHA-256 of salt.\n " - "Expected: {}\n Got:{}".format(expected_digest, actual_digest)) + log.error("Invalid keyfile: Invalid SHA-256 of salt.\n " + "Expected: {}\n Got:{}".format(expected_digest, actual_digest)) self.__hashedgoogleid = actual_digest @@ -119,12 +120,12 @@ def __init__(self, keyarray: bytes = None, # Check if IV is made of zeroes for byte in self.__padding: if byte: - l.error("Invalid keyfile: IV is not zeroed out but is: {}".format(self.__padding.hex())) + log.error("Invalid keyfile: IV is not zeroed out but is: {}".format(self.__padding.hex())) break self.__key = keyarray[99:] - l.info("Crypt12/14 key loaded") + log.info("Crypt12/14 key loaded") def get(self) -> bytes: return self.__key @@ -141,7 +142,6 @@ def get_cipher_version(self) -> bytes: def get_key_version(self) -> bytes: return self.__key_version - def __str__(self) -> str: """Returns a string representation of the key""" try: diff --git a/src/wa_crypt_tools/lib/key/key15.py b/src/wa_crypt_tools/lib/key/key15.py index cd9de0a..7443946 100644 --- a/src/wa_crypt_tools/lib/key/key15.py +++ b/src/wa_crypt_tools/lib/key/key15.py @@ -9,12 +9,15 @@ from wa_crypt_tools.lib.key.key import Key import logging + l = logging.getLogger(__name__) + + class Key15(Key): # This constant is only used with crypt15 keys. BACKUP_ENCRYPTION = b'backup encryption' - def __init__(self, keyarray: bytes=None, key: bytes=None): + def __init__(self, keyarray: bytes = None, key: bytes = None): """Extracts the key from a loaded crypt15 key file.""" # encrypted_backup.key file format and encoding explanation: # The E2E key file is actually a serialized byte[] object. @@ -57,7 +60,7 @@ def get(self) -> bytes: return encryptionloop( first_iteration_data=self.__key, message=b'backup encryption', - outputBytes=32) + output_bytes=32) def get_root(self) -> bytes: """ @@ -72,7 +75,7 @@ def get_metadata_encryption(self) -> bytes: return encryptionloop( first_iteration_data=self.__key, message=b'metadata encryption', - outputBytes=32) + output_bytes=32) def get_metadata_authentication(self) -> bytes: """ @@ -81,7 +84,7 @@ def get_metadata_authentication(self) -> bytes: return encryptionloop( first_iteration_data=self.__key, message=b'metadata authentication', - outputBytes=32) + output_bytes=32) def dump(self) -> bytes: """Dumps the key""" diff --git a/src/wa_crypt_tools/lib/logformat.py b/src/wa_crypt_tools/lib/logformat.py index 4f8ee23..da30fc3 100644 --- a/src/wa_crypt_tools/lib/logformat.py +++ b/src/wa_crypt_tools/lib/logformat.py @@ -1,6 +1,7 @@ import logging -class CustomFormatter(logging.Formatter): + +class CustomFormatter(logging.Formatter): grey = "\x1b[38;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" @@ -19,4 +20,4 @@ class CustomFormatter(logging.Formatter): def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) - return formatter.format(record) \ No newline at end of file + return formatter.format(record) diff --git a/src/wa_crypt_tools/lib/utils.py b/src/wa_crypt_tools/lib/utils.py index 09b4a2a..466e228 100644 --- a/src/wa_crypt_tools/lib/utils.py +++ b/src/wa_crypt_tools/lib/utils.py @@ -80,21 +80,21 @@ def javaintlist2bytes(barr: JavaArray) -> bytes: def encryptionloop(*, first_iteration_data: bytes, privateseed: bytes = b'\x00' * 32, message: bytes, - outputBytes: int): + output_bytes: int): # The private key and the seed are used to create the HMAC key privatekey = hmac.new(privateseed, msg=first_iteration_data, digestmod=sha256).digest() data = b'' output = b'' - numPermutations = int(math.ceil(float(outputBytes) / float(32))) + permutations = int(math.ceil(float(output_bytes) / float(32))) i = 1 - while i < numPermutations + 1: + while i < permutations + 1: hasher = hmac.new(privatekey, msg=data, digestmod=sha256) if message is not None: hasher.update(message) hasher.update(i.to_bytes(1, byteorder='big')) data = hasher.digest() - bytestowrite = min(outputBytes, len(data)) + bytestowrite = min(output_bytes, len(data)) output += data[:bytestowrite] i += 1 return output @@ -104,20 +104,20 @@ def mcrypt1_metadata_decrypt(*, key, encoded: str): """ Decrypts the metadata of a mcrypt1 file. :param key: The key used to decrypt the metadata - :param encoded: The metadata downloaded from google drive in base64 + :param encoded: The metadata downloaded from Google Drive in base64 :return: The decrypted JSON """ # Base64 decoding encoded = base64.b64decode(encoded) # PKCS5Padding is not natively supported unpad = lambda s: s[:-ord(s[len(s) - 1:])] - ivSize = encoded[0] - if ivSize != 16: + iv_size = encoded[0] + if iv_size != 16: raise Exception("IV Size is not 16") iv = encoded[1:17] - macSize = encoded[17] - if macSize != 32: + mac_size = encoded[17] + if mac_size != 32: raise Exception("MAC Size is not 32") mac = encoded[18:50] @@ -152,6 +152,7 @@ def get_mcrypt1_name(*, key, name: str, md5: bytes) -> bytes: media_hash = hmac_n.digest() return media_hash + def header_info(header): """ shows all header, information including the feature vector @@ -175,11 +176,11 @@ def header_info(header): string += str("The last two numbers of the user's Jid: {}\n".format(header.info.jidSuffix)) string += str("Backup version: {}\n".format(header.info.backup_version)) #string += str("Size of the backup file: {}".format(header.backup_export_file_size)) - features = [n for n in [*range(5,38), 39] if getattr(header.info, "f_" + str(n)) == True] + features = [n for n in [*range(5, 38), 39] if getattr(header.info, "f_" + str(n)) == True] if len(features) > 0: string += str("Features: {}\n".format(features)) string += str("Max feature number: {}\n".format(max(features))) - else: + else: string += str("No feature table found (not a msgstore DB or very old)\n") - - return string \ No newline at end of file + + return string diff --git a/src/wa_crypt_tools/wacreatekey.py b/src/wa_crypt_tools/wacreatekey.py index 470c774..b341795 100644 --- a/src/wa_crypt_tools/wacreatekey.py +++ b/src/wa_crypt_tools/wacreatekey.py @@ -67,8 +67,8 @@ def main(): lo.warning("Server salt not specified, a random one will be generated.") if args.googleid is None: lo.warning("Google id not specified, a random one will be generated.") - key: Key14 = Key14(cipher_version=args.cipher_version.to_bytes(2,"big"), - key_version=args.key_version.to_bytes(1,"big"), + key: Key14 = Key14(cipher_version=args.cipher_version.to_bytes(2, "big"), + key_version=args.key_version.to_bytes(1, "big"), serversalt=args.server_salt, googleid=args.googleid, iv=None, key=None) else: diff --git a/src/wa_crypt_tools/wadecrypt.py b/src/wa_crypt_tools/wadecrypt.py index b6a0b84..8f2a17c 100644 --- a/src/wa_crypt_tools/wadecrypt.py +++ b/src/wa_crypt_tools/wadecrypt.py @@ -25,17 +25,17 @@ if not hasattr(AES, 'MODE_GCM'): # pycrypto raise ModuleNotFoundError("You installed pycrypto and not pycryptodome(x). " - "Pycrypto is old, deprecated and not supported. \n" - "Run: python -m pip uninstall pycrypto\n" - "And: python -m pip install pycryptodomex\n" - "Or: python -m pip install pycryptodome") + "Pycrypto is old, deprecated and not supported. \n" + "Run: python -m pip uninstall pycrypto\n" + "And: python -m pip install pycryptodomex\n" + "Or: python -m pip install pycryptodome") except ModuleNotFoundError: # crypto (or nothing) raise ModuleNotFoundError("You need pycryptodome(x) to run these scripts!\n" - "python -m pip install pycryptodome\n" - "Or: python -m pip install pycryptodome\n" - "You can also remove \"crypto\" if you have it installed\n" - "python -m pip uninstall crypto") + "python -m pip install pycryptodome\n" + "Or: python -m pip install pycryptodome\n" + "You can also remove \"crypto\" if you have it installed\n" + "python -m pip uninstall crypto") # noinspection PyPackageRequirements # This is from javaobj-py3 @@ -56,8 +56,8 @@ __status__ = 'Production' import logging -l = logging.getLogger(__name__) +log = logging.getLogger(__name__) def parsecmdline() -> argparse.Namespace: @@ -92,12 +92,12 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = z_obj = zlib.decompressobj() if cipher is None: - l.fatal("Could not create a decryption cipher") + log.fatal("Could not create a decryption cipher") try: if buffer_size < 17: - l.info("Invalid buffer size, will use default of {}".format(io.DEFAULT_BUFFER_SIZE)) + log.info("Invalid buffer size, will use default of {}".format(io.DEFAULT_BUFFER_SIZE)) buffer_size = io.DEFAULT_BUFFER_SIZE # Does the thing above but only with DEFAULT_BUFFER_SIZE bytes at a time. @@ -107,7 +107,7 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = chunk = encrypted.read(buffer_size) - l.debug("Reading and decrypting...") + log.debug("Reading and decrypting...") while next_chunk := encrypted.read(buffer_size): @@ -120,7 +120,7 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = try: next_chunk = encrypted.read(buffer_size) except MemoryError: - l.fatal("Out of RAM, please use a smaller buffer size.") + log.fatal("Out of RAM, please use a smaller buffer size.") if len(next_chunk) <= 36: # Last bytes read. Three cases: @@ -147,9 +147,9 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = decrypted.write(z_obj.decompress(decrypted_chunk)) except zlib.error: if test_decompression(decrypted_chunk): - l.info("Decrypted data is a ZIP file that I will not decompress automatically.") + log.info("Decrypted data is a ZIP file that I will not decompress automatically.") else: - l.error("I can't recognize decrypted data. Decryption not successful.\n " + log.error("I can't recognize decrypted data. Decryption not successful.\n " "The key probably does not match with the encrypted file.") is_zip = False decrypted.write(decrypted_chunk) @@ -165,7 +165,7 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = if len(jid) == 1: # Confirmed to be crypt12 checksum = checksum[:-4] - l.debug("Your phone number ends with {}".format(jid[0])) + log.debug("Your phone number ends with {}".format(jid[0])) else: # Shift everything forward by 4 bytes chunk = checksum[:4] @@ -178,7 +178,7 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = else: decrypted.write(z_obj.decompress(decrypted_chunk)) except zlib.error: - l.error("Backup is corrupted.") + log.error("Backup is corrupted.") decrypted.write(decrypted_chunk) else: decrypted.write(decrypted_chunk) @@ -188,7 +188,7 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = if file_hash.digest() != checksum[16:]: is_multifile_backup = True else: - l.debug("Checksum OK ({})!".format(file_hash.hexdigest())) + log.debug("Checksum OK ({})!".format(file_hash.hexdigest())) try: if is_multifile_backup: decrypted.write(cipher.decrypt(checksum[:16])) @@ -196,19 +196,19 @@ def chunked_decrypt(file_hash, cipher, encrypted, decrypted, buffer_size: int = else: cipher.verify(checksum[:16]) except ValueError as e: - l.error("Authentication tag mismatch: {}." + log.error("Authentication tag mismatch: {}." "\n This probably means your backup is corrupted.".format(e)) break chunk = next_chunk if is_zip and not z_obj.eof: - l.error("The encrypted database file is truncated (damaged).") + log.error("The encrypted database file is truncated (damaged).") decrypted.flush() except OSError as e: - l.fatal("I/O error: {}".format(e)) + log.fatal("I/O error: {}".format(e)) finally: decrypted.close() @@ -219,20 +219,20 @@ def main(): args = parsecmdline() # set wa_crypt_tools l to debug - l.setLevel(logging.DEBUG if args.verbose else logging.INFO) + log.setLevel(logging.DEBUG if args.verbose else logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG if args.verbose else logging.INFO) ch.setFormatter(CustomFormatter()) - l.addHandler(ch) + log.addHandler(ch) # also add to "wa_crypt_tools.lib" logger logging.getLogger("wa_crypt_tools.lib").addHandler(ch) logging.getLogger("wa_crypt_tools.lib").setLevel(logging.DEBUG if args.verbose else logging.INFO) if args.buffer_size is not None: if not 1 < args.buffer_size < maxsize: - l.fatal("Invalid buffer size") + log.fatal("Invalid buffer size") # Get the decryption key from the key file or the hex encoded string. key = KeyFactory.new(args.keyfile) - l.debug(str(key)) + log.debug(str(key)) db = DatabaseFactory.from_file(args.encrypted) cipher = AES.new(key.get(), AES.MODE_GCM, db.get_iv()) @@ -240,7 +240,8 @@ def main(): if args.buffer_size is not None: chunked_decrypt(db.file_hash, cipher, args.encrypted, args.decrypted, args.buffer_size, args.no_decompress) elif args.no_mem: - chunked_decrypt(db.file_hash, cipher, args.encrypted, args.decrypted, io.DEFAULT_BUFFER_SIZE, args.no_decompress) + chunked_decrypt(db.file_hash, cipher, args.encrypted, args.decrypted, io.DEFAULT_BUFFER_SIZE, + args.no_decompress) else: output_decrypted: bytearray = db.decrypt(key, args.encrypted.read()) try: @@ -251,24 +252,23 @@ def main(): else: output_file = z_obj.decompress(output_decrypted) if not z_obj.eof: - l.error("The encrypted database file is truncated (damaged).") + log.error("The encrypted database file is truncated (damaged).") except zlib.error: output_file = output_decrypted if test_decompression(output_file[:io.DEFAULT_BUFFER_SIZE]): - l.info("Decrypted data is a ZIP file that I will not decompress automatically.") + log.info("Decrypted data is a ZIP file that I will not decompress automatically.") else: - l.error("I can't recognize decrypted data. Decryption not successful.\n " + log.error("I can't recognize decrypted data. Decryption not successful.\n " "The key probably does not match with the encrypted file.\n " "Or the backup is simply empty. (check with --force)") args.decrypted.write(output_file) - if date.today().day == 1 and date.today().month == 4: - l.info("Done. Uploading messages to the developer's server...") + log.info("Done. Uploading messages to the developer's server...") sleep(0.5) - l.info("Uploaded. The developer will now read and publish your messages!") + log.info("Uploaded. The developer will now read and publish your messages!") else: - l.info("Done") + log.info("Done") if __name__ == "__main__": diff --git a/src/wa_crypt_tools/waencrypt.py b/src/wa_crypt_tools/waencrypt.py index 5308f75..a9b944a 100644 --- a/src/wa_crypt_tools/waencrypt.py +++ b/src/wa_crypt_tools/waencrypt.py @@ -15,7 +15,9 @@ from wa_crypt_tools.lib.logformat import CustomFormatter from wa_crypt_tools.lib.props import Props -l = logging.getLogger(__name__) +log = logging.getLogger(__name__) + + def parsecmdline() -> argparse.Namespace: """Parses the command line arguments.""" """Sets up the argument parser""" @@ -30,32 +32,42 @@ def parsecmdline() -> argparse.Namespace: parser.add_argument('-f', '--force', action='store_true', help='Makes errors non fatal. Default: false') parser.add_argument('-v', '--verbose', action='store_true', help='Prints all offsets and messages') - parser.add_argument('--enable-features', type=int, nargs='*', default=C.DEFAULT_FEATURE_LIST, help='Enables the specified features. ') - parser.add_argument('--max-feature', type=int, default=39, help='The max feature number, the older is the backup the lower should be the number. ') - parser.add_argument('--multi-file', action='store_true', help='Encrypts a multi-file backup (either stickers or wallpapers)') - parser.add_argument('--type', type=int, choices=[12, 14, 15], default=15, help='The type of encryption to use. Default: 15') + parser.add_argument('--enable-features', type=int, nargs='*', default=C.DEFAULT_FEATURE_LIST, + help='Enables the specified features. ') + parser.add_argument('--max-feature', type=int, default=39, + help='The max feature number, the older is the backup the lower should be the number. ') + parser.add_argument('--multi-file', action='store_true', + help='Encrypts a multi-file backup (either stickers or wallpapers)') + parser.add_argument('--type', type=int, choices=[12, 14, 15], default=15, + help='The type of encryption to use. Default: 15') parser.add_argument('--iv', type=str, help='The IV to use for crypt15 encryption. Default: random') - parser.add_argument('--reference', type=argparse.FileType('rb'), help='The reference file to use for crypt15 encryption. Highly recommended.') - parser.add_argument('--noparse', action='store_true', help='Do not parse the header of the reference file. Default: false') - parser.add_argument('--wa-version', type=str, default=C.DEFAULT_APP_VERSION, help='The WhatsApp version to use for crypt15 encryption. Default:' + - C.DEFAULT_APP_VERSION) - parser.add_argument('--jid', type=str, default=C.DEFAULT_JID_SUFFIX, help='The last 2 numbers of your phone number. Default: 00') - parser.add_argument('--backup-version', type=int, default=C.DEFAULT_BACKUP_VERSION, help='The backup version to use in the header of the encrypted file. Default: 0') - parser.add_argument('--no-compress', action='store_true', help='Do not compress the file. This will make the backup not working. Only used in develpiomente. Default: false') + parser.add_argument('--reference', type=argparse.FileType('rb'), + help='The reference file to use for crypt15 encryption. Highly recommended.') + parser.add_argument('--noparse', action='store_true', + help='Do not parse the header of the reference file. Default: false') + parser.add_argument('--wa-version', type=str, default=C.DEFAULT_APP_VERSION, + help='The WhatsApp version to use for crypt15 encryption. Default:' + + C.DEFAULT_APP_VERSION) + parser.add_argument('--jid', type=str, default=C.DEFAULT_JID_SUFFIX, + help='The last 2 numbers of your phone number. Default: 00') + parser.add_argument('--backup-version', type=int, default=C.DEFAULT_BACKUP_VERSION, + help='The backup version to use in the header of the encrypted file. Default: 0') + parser.add_argument('--no-compress', action='store_true', + help='Do not compress the file. This will make the backup not working. Only used in development. Default: false') return parser.parse_args() + def main(): """Main function""" # Parse the command line arguments args = parsecmdline() # set wa_crypt_tools l to debug - l.setLevel(logging.DEBUG if args.verbose else logging.INFO) + log.setLevel(logging.DEBUG if args.verbose else logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG if args.verbose else logging.INFO) ch.setFormatter(CustomFormatter()) - l.addHandler(ch) - l.warning("This script is in beta stage") - + log.addHandler(ch) + log.warning("This script is in beta stage") # Read the key file key = KeyFactory.new(args.keyfile) @@ -67,7 +79,7 @@ def main(): iv = bytes.fromhex(args.iv) # Create the props object from the command line arguments props = Props(wa_version=args.wa_version, jid=args.jid, max_feature=args.max_feature, - features=args.enable_features, backup_version=args.backup_version) + features=args.enable_features, backup_version=args.backup_version) else: reference = DatabaseFactory.from_file(args.reference) iv: bytes = reference.get_iv() @@ -86,7 +98,7 @@ def main(): encrypted = db.encrypt(key, props, compressed) args.encrypted.write(encrypted) # Close the files - l.info("Done!") + log.info("Done!") args.decrypted.close() args.encrypted.close() diff --git a/src/wa_crypt_tools/waguess.py b/src/wa_crypt_tools/waguess.py index 4153308..ff9959c 100644 --- a/src/wa_crypt_tools/waguess.py +++ b/src/wa_crypt_tools/waguess.py @@ -15,7 +15,7 @@ from wa_crypt_tools.lib.logformat import CustomFormatter from wa_crypt_tools.lib.utils import test_decompression -l = logging.getLogger(__name__) +log = logging.getLogger(__name__) # AES import party! # pycryptodome and PyCryptodomex's implementations of AES are the same, @@ -32,18 +32,17 @@ if not hasattr(AES, 'MODE_GCM'): # pycrypto raise ModuleNotFoundError("You installed pycrypto and not pycryptodome(x). " - "Pycrypto is old, deprecated and not supported. \n" - "Run: python -m pip uninstall pycrypto\n" - "And: python -m pip install pycryptodomex\n" - "Or: python -m pip install pycryptodome") + "Pycrypto is old, deprecated and not supported. \n" + "Run: python -m pip uninstall pycrypto\n" + "And: python -m pip install pycryptodomex\n" + "Or: python -m pip install pycryptodome") except ModuleNotFoundError: # crypto (or nothing) raise ModuleNotFoundError("You need pycryptodome(x) to run these scripts!\n" - "python -m pip install pycryptodome\n" - "Or: python -m pip install pycryptodome\n" - "You can also remove \"crypto\" if you have it installed\n" - "python -m pip uninstall crypto") - + "python -m pip install pycryptodome\n" + "Or: python -m pip install pycryptodome\n" + "You can also remove \"crypto\" if you have it installed\n" + "python -m pip uninstall crypto") def oscillate(n: int, n_min: int, n_max: int): @@ -66,30 +65,32 @@ def oscillate(n: int, n_min: int, n_max: int): if i == n_max: break yield i - i = i - c - c = c + 1 + i -= c + c += 1 if i == 0 or i == n_min: break yield i - i = i + c - c = c + 1 + i += c + c += 1 # Second phase (range of remaining numbers) # n != i/2 fixes a bug where we would yield min and max two times if n == (max-min)/2 if i == n_min and n != i / 2: yield i - i = i + c + i += c for j in range(i, n_max + 1): yield j if i == n_max and n != i / 2: yield n_max - i = i - c + i -= c for j in range(i, n_min - 1, -1): yield j + + def find_data_offset(header: bytes, iv_offset: int, key: bytes, starting_data_offset: int) -> int: """Tries to find the offset in which the encrypted data starts. Returns the offset or -1 if the offset is not found. @@ -131,30 +132,30 @@ def guess_offsets(key: bytes, encrypted: io.BufferedReader, def_iv_offset: int, db_header = encrypted.read(C.HEADER_SIZE) if len(db_header) < C.HEADER_SIZE: - l.fatal("The encrypted database is too small.\n " - "Did you swap the keyfile and the encrypted database file by mistake?") + log.fatal("The encrypted database is too small.\n " + "Did you swap the keyfile and the encrypted database file by mistake?") try: if db_header[:15].decode('ascii') == 'SQLite format 3': - l.error("The database file is not encrypted.\n " - "Did you swap the input and the output files by mistake?") + log.error("The database file is not encrypted.\n " + "Did you swap the input and the output files by mistake?") except ValueError: pass # Finding WhatsApp's version is nice version = findall(b"\\d(?:\\.\\d{1,3}){3}", db_header) if len(version) != 1: - l.info('WhatsApp version not found (Crypt12?)') + log.info('WhatsApp version not found (Crypt12?)') else: - l.debug("WhatsApp version: {}".format(version[0].decode('ascii'))) + log.debug("WhatsApp version: {}".format(version[0].decode('ascii'))) # Determine IV offset and data offset. for iv_offset in oscillate(n=def_iv_offset, n_min=0, n_max=C.HEADER_SIZE - 128): data_offset = find_data_offset(db_header, iv_offset, key, def_data_offset) if data_offset != -1: - l.info("Offsets guessed (IV: {}, data: {}).".format(iv_offset, data_offset)) + log.info("Offsets guessed (IV: {}, data: {}).".format(iv_offset, data_offset)) if iv_offset != def_iv_offset or data_offset != def_data_offset: - l.info("Next time, use -ivo {} -do {} for guess-free decryption".format(iv_offset, data_offset)) + log.info("Next time, use -ivo {} -do {} for guess-free decryption".format(iv_offset, data_offset)) break if data_offset == -1: return None @@ -194,7 +195,7 @@ def decrypt(cipher, encrypted, decrypted): z_obj = zlib.decompressobj() if cipher is None: - l.fatal("Could not create a decryption cipher") + log.fatal("Could not create a decryption cipher") try: @@ -207,67 +208,68 @@ def decrypt(cipher, encrypted, decrypted): try: output_decrypted: bytearray = cipher.decrypt(encrypted_data) except ValueError as e: - l.fatal("Decryption failed: {}." - "\n This probably means your backup is corrupted.".format(e)) + log.fatal("Decryption failed: {}." + "\n This probably means your backup is corrupted.".format(e)) # Dead code to make pycharm warning go away exit(1) - try: output_file = z_obj.decompress(output_decrypted) if not z_obj.eof: - l.error("The encrypted database file is truncated (damaged).") + log.error("The encrypted database file is truncated (damaged).") except zlib.error: output_file = output_decrypted if test_decompression(output_file[:io.DEFAULT_BUFFER_SIZE]): - l.info("Decrypted data is a ZIP file that I will not decompress automatically.") + log.info("Decrypted data is a ZIP file that I will not decompress automatically.") else: - l.error("I can't recognize decrypted data. Decryption not successful.\n " - "The key probably does not match with the encrypted file.\n " - "Or the backup is simply empty. (check with --force)") + log.error("I can't recognize decrypted data. Decryption not successful.\n " + "The key probably does not match with the encrypted file.\n " + "Or the backup is simply empty. (check with --force)") decrypted.write(output_file) except MemoryError: - l.fatal("Out of RAM, please use -nm.") + log.fatal("Out of RAM, please use -nm.") decrypted.flush() except OSError as e: - l.fatal("I/O error: {}".format(e)) + log.fatal("I/O error: {}".format(e)) finally: decrypted.close() encrypted.close() -def main(): + +def main(): args = parsecmdline() # set wa_crypt_tools l to debug - l.setLevel(logging.DEBUG if args.verbose else logging.INFO) + log.setLevel(logging.DEBUG if args.verbose else logging.INFO) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG if args.verbose else logging.INFO) ch.setFormatter(CustomFormatter()) - l.addHandler(ch) + log.addHandler(ch) if not (0 < args.data_offset < C.HEADER_SIZE - 128): - l.fatal("The data offset must be between 1 and {}".format(C.HEADER_SIZE - 129)) + log.fatal("The data offset must be between 1 and {}".format(C.HEADER_SIZE - 129)) if not (0 < args.iv_offset < C.HEADER_SIZE - 128): - l.fatal("The IV offset must be between 1 and {}".format(C.HEADER_SIZE - 129)) + log.fatal("The IV offset must be between 1 and {}".format(C.HEADER_SIZE - 129)) # Get the decryption key from the key file or the hex encoded string. key = KeyFactory.new(args.keyfile) - l.debug(str(key)) + log.debug(str(key)) cipher = guess_offsets(key=key.get(), encrypted=args.encrypted, - def_iv_offset=args.iv_offset, def_data_offset=args.data_offset) + def_iv_offset=args.iv_offset, def_data_offset=args.data_offset) decrypt(cipher, args.encrypted, args.decrypted) if date.today().day == 1 and date.today().month == 4: - l.info("Done. Uploading messages to the developer's server...") + log.info("Done. Uploading messages to the developer's server...") sleep(0.5) - l.info("Uploaded. The developer will now read and publish your messages!") + log.info("Uploaded. The developer will now read and publish your messages!") else: - l.info("Done") + log.info("Done") + if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/src/wa_crypt_tools/wainfo.py b/src/wa_crypt_tools/wainfo.py index b7aada6..0f2f5e8 100644 --- a/src/wa_crypt_tools/wainfo.py +++ b/src/wa_crypt_tools/wainfo.py @@ -17,7 +17,8 @@ __status__ = 'Beta' import logging -l = logging.getLogger(__name__) + +log = logging.getLogger(__name__) def parsecmdline() -> argparse.Namespace: @@ -32,31 +33,32 @@ def parsecmdline() -> argparse.Namespace: help='tell the program that the file is a key file') return parser.parse_args() + def main(): args = parsecmdline() # set wa_crypt_tools l to debug - l.setLevel(logging.DEBUG) + log.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(CustomFormatter()) - l.addHandler(ch) + log.addHandler(ch) # also add to "wa_crypt_tools.lib" logger logging.getLogger("wa_crypt_tools.lib").addHandler(ch) logging.getLogger("wa_crypt_tools.lib").setLevel(logging.DEBUG) - l.warning("This script is in beta stage.") + log.warning("This script is in beta stage.") - if (args.key): + if args.key: key = KeyFactory.from_file(args.encrypted) print(key) return try: - DatabaseFactory.from_file(open(args.encrypted,'rb')) + DatabaseFactory.from_file(open(args.encrypted, 'rb')) except Exception as e: - l.error("Error: {}".format(e)) - return - # TODO + log.error("Error: {}".format(e)) + return + # TODO # print(db) diff --git a/tests/lib/db/test_db.py b/tests/lib/db/test_db.py index 057b0bc..f0d12c2 100644 --- a/tests/lib/db/test_db.py +++ b/tests/lib/db/test_db.py @@ -1,5 +1,3 @@ -from unittest import TestCase - -class TestDatabase(TestCase): +class TestDatabase(): pass diff --git a/tests/lib/test_constants.py b/tests/lib/test_constants.py index 11ad74b..93956ba 100644 --- a/tests/lib/test_constants.py +++ b/tests/lib/test_constants.py @@ -3,4 +3,4 @@ class TestConstants: def test_zip_header(self): - assert C.ZIP_HEADER == b'PK\x03\x04' \ No newline at end of file + assert C.ZIP_HEADER == b'PK\x03\x04' diff --git a/tests/lib/test_utils.py b/tests/lib/test_utils.py index 0bddb9c..5c7b1d8 100644 --- a/tests/lib/test_utils.py +++ b/tests/lib/test_utils.py @@ -1,7 +1,7 @@ from wa_crypt_tools.lib.utils import hexstring2bytes -class Test_Utils: +class TestUtils: # Sample test to test the test infrastructure (!) def test_hexstring2bytes(self): assert hexstring2bytes("0"*64) == b'\x00' * 32 diff --git a/tests/test_createkey.py b/tests/test_createkey.py index b78e9c5..b1e19c2 100644 --- a/tests/test_createkey.py +++ b/tests/test_createkey.py @@ -7,29 +7,30 @@ from wa_crypt_tools.lib.props import Props from hashlib import sha512 -class Test_CreateKey: + +class TestCreatekey: def test_createkey(self): key: Key15 = Key15(key= - bytes.fromhex( - '6730a595a1484d0c39c101dc0ac82ec5e401bb6f0e1b8ee2dc104a6b3687f017' - )) + bytes.fromhex( + '6730a595a1484d0c39c101dc0ac82ec5e401bb6f0e1b8ee2dc104a6b3687f017' + )) keyb: bytes = key.dump() keyb_digest = sha512(keyb).digest() with open("tests/res/encrypted_backup.key", 'rb') as f: orig_check = sha512(f.read()).digest() assert keyb_digest == orig_check - + def test_createkey14(self): key: Key14 = Key14(key= - bytes.fromhex( - '3a146d9bbd8b6311d962c71619c0c2cce3ce694ea4a0f3f600e271380e1226c6' - ), - serversalt=bytes.fromhex('cd788b1b4625f50d3fccdeac94e1ff638899733b77a224ff614918363901f044'), - googleid=bytes.fromhex('92683e735c88727eef9486911f3ac6fa'), - key_version=b'\x02', - cipher_version=b'\x00\x01') + bytes.fromhex( + '3a146d9bbd8b6311d962c71619c0c2cce3ce694ea4a0f3f600e271380e1226c6' + ), + serversalt=bytes.fromhex('cd788b1b4625f50d3fccdeac94e1ff638899733b77a224ff614918363901f044'), + googleid=bytes.fromhex('92683e735c88727eef9486911f3ac6fa'), + key_version=b'\x02', + cipher_version=b'\x00\x01') keyb: bytes = key.dump() keyb_digest = sha512(keyb).digest() with open("tests/res/key", 'rb') as f: orig_check = sha512(f.read()).digest() - assert keyb_digest == orig_check \ No newline at end of file + assert keyb_digest == orig_check diff --git a/tests/test_decrypt.py b/tests/test_decrypt.py index 59a5d10..37d9b41 100644 --- a/tests/test_decrypt.py +++ b/tests/test_decrypt.py @@ -9,7 +9,7 @@ from wa_crypt_tools.lib.props import Props from hashlib import sha512 -class Test_Decryption: +class TestDecryption: def test_decryption15(self): key = KeyFactory.new("tests/res/encrypted_backup.key") f = open("tests/res/msgstore.db.crypt15",'rb') diff --git a/tests/test_encrypt.py b/tests/test_encrypt.py index 0c72ef5..75d1dc6 100644 --- a/tests/test_encrypt.py +++ b/tests/test_encrypt.py @@ -8,19 +8,21 @@ from wa_crypt_tools.lib.props import Props from hashlib import sha512 -class Test_Encryption: + +class TestEncryption: def test_encryption15(self): key = KeyFactory.new("tests/res/encrypted_backup.key") - props = Props(wa_version="2.22.5.13", jid="67", features=[5,7,8,13,14,19,22,25,28,30,31,32,36,37], max_feature=37) + props = Props(wa_version="2.22.5.13", jid="67", features=[5, 7, 8, 13, 14, 19, 22, 25, 28, 30, 31, 32, 36, 37], + max_feature=37) db = Database15(key=key, iv=bytes.fromhex("C395EE009CF8B68AC0EA760550F6559C")) data = db.encrypt( key, - props, + props, zlib.compress( open("tests/res/msgstore.db", 'rb').read(), level=1, - ) ) + ) new_check = sha512(data).digest() with open("tests/res/msgstore-new.db.crypt15", 'wb') as f: f.write(data) @@ -31,7 +33,8 @@ def test_encryption15(self): def test_encryption14(self): key = KeyFactory.new("tests/res/key") - props = Props(wa_version="2.22.5.13", jid="67", features=[5,7,8,13,14,19,22,25,28,30,31,32,36,37], max_feature=37) + props = Props(wa_version="2.22.5.13", jid="67", features=[5, 7, 8, 13, 14, 19, 22, 25, 28, 30, 31, 32, 36, 37], + max_feature=37) db = Database14(key=key, iv=bytes.fromhex("EA53CEAE36ECAB50BC331AEB62491625")) data = db.encrypt( key, @@ -39,8 +42,8 @@ def test_encryption14(self): zlib.compress( open("tests/res/msgstore.db", 'rb').read(), level=1, - ) ) + ) new_check = sha512(data).digest() with open("tests/res/msgstore-new.db.crypt14", 'wb') as f: f.write(data) @@ -59,8 +62,8 @@ def test_encryption14_noexpiry(self): zlib.compress( open("tests/res/msgstore.db", 'rb').read(), level=1, - ) ) + ) new_check = sha512(data).digest() with open("tests/res/msgstore-new.db.crypt14", 'wb') as f: f.write(data) @@ -79,12 +82,12 @@ def test_encryption12(self): zlib.compress( open("tests/res/msgstore.db", 'rb').read(), level=1, - ) ) + ) new_check = sha512(data).digest() with open("tests/res/msgstore-new.db.crypt12", 'wb') as f: f.write(data) with open("tests/res/msgstore.db.crypt12", 'rb') as f: orig_check = sha512(f.read()).digest() assert new_check == orig_check - os.remove("tests/res/msgstore-new.db.crypt12") \ No newline at end of file + os.remove("tests/res/msgstore-new.db.crypt12")