Skip to content

Commit

Permalink
Various formatting changes
Browse files Browse the repository at this point in the history
New PyCharm version: new checks
  • Loading branch information
ElDavoo committed May 24, 2024
1 parent 92fd016 commit 5cd2ed6
Show file tree
Hide file tree
Showing 20 changed files with 291 additions and 274 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ Linux:

`PATH="$(pwd):$PATH" protol --in-place --python-out ../src/wa_crypt_tools/proto protoc --proto-path=. *.proto`

Now all of the generated python classes should have their imports fixed.
Now all the generated python classes should have their imports fixed.

---

Expand Down
3 changes: 2 additions & 1 deletion src/wa_crypt_tools/lib/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from wa_crypt_tools.lib.key.key import Key
from wa_crypt_tools.lib.props import Props

l = logging.getLogger(__name__)
log = logging.getLogger(__name__)


class Database(abc.ABC):
"""
Expand Down
42 changes: 21 additions & 21 deletions src/wa_crypt_tools/lib/db/db12.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from wa_crypt_tools.lib.key.key14 import Key14
from wa_crypt_tools.lib.props import Props

l = logging.getLogger(__name__)
log = logging.getLogger(__name__)


class Database12(Database):
"""
Implementation of a crypt12 database.
Expand Down Expand Up @@ -40,25 +42,25 @@ def __init__(self, key: Key14 = None, encrypted=None,
if encrypted and key:
self.cipher_version = encrypted.read(2)
if self.cipher_version != key.get_cipher_version():
l.error("Cipher version mismatch: {} != {}".format(self.cipher_version, key.get_cipher_version()))
log.error("Cipher version mismatch: {} != {}".format(self.cipher_version, key.get_cipher_version()))
raise ValueError
self.file_hash.update(self.cipher_version)

self.key_version = encrypted.read(1)
if self.key_version != key.get_key_version():
l.error("Key version mismatch: {} != {}".format(self.key_version, key.get_key_version()))
log.error("Key version mismatch: {} != {}".format(self.key_version, key.get_key_version()))
raise ValueError
self.file_hash.update(self.key_version)

self.serversalt = encrypted.read(32)
if self.serversalt != key.get_serversalt():
l.error("Server salt mismatch: {} != {}".format(self.serversalt, key.get_serversalt()))
log.error("Server salt mismatch: {} != {}".format(self.serversalt, key.get_serversalt()))
raise ValueError
self.file_hash.update(self.serversalt)

self.googleid = encrypted.read(16)
if self.googleid != key.get_googleid():
l.error("Google ID mismatch: {} != {}".format(self.googleid, key.get_googleid()))
log.error("Google ID mismatch: {} != {}".format(self.googleid, key.get_googleid()))
self.file_hash.update(self.googleid)

self.iv = encrypted.read(16)
Expand Down Expand Up @@ -106,7 +108,7 @@ def __init__(self, key: Key14 = None, encrypted=None,
self.cipher_version = cipher_version
self.file_hash.update(self.cipher_version)
else:
l.error("Unsupported cipher version provided!")
log.error("Unsupported cipher version provided!")
raise ValueError
else:
self.cipher_version = C.SUPPORTED_CIPHER_VERSION
Expand All @@ -117,7 +119,7 @@ def __init__(self, key: Key14 = None, encrypted=None,
self.key_version = key_version
self.file_hash.update(self.key_version)
else:
l.error("Unsupported key version provided!")
log.error("Unsupported key version provided!")
else:
self.key_version = C.SUPPORTED_KEY_VERSIONS[-1]
self.file_hash.update(self.key_version)
Expand Down Expand Up @@ -154,9 +156,9 @@ def decrypt(self, key: Key14, encrypted: bytes) -> bytes:
crypt12_footer = str(userjid)
jid = findall(r"(?:-|\d)(?:-|\d)(\d\d)", crypt12_footer)
if len(jid) != 1:
l.error("The phone number end is not 2 characters long")
log.error("The phone number end is not 2 characters long")
else:
l.debug("Your phone number ends with {}".format(jid[0]))
log.debug("Your phone number ends with {}".format(jid[0]))
checksum = encrypted[-20:-4]
authentication_tag = encrypted[-36:-20]
encrypted_data = encrypted[:-36]
Expand All @@ -170,14 +172,14 @@ def decrypt(self, key: Key14, encrypted: bytes) -> bytes:
# TODO do crypt12 multifiles actually exist?
is_multifile_backup = True
else:
l.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest()))
log.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest()))

cipher = AES.new(key.get(), AES.MODE_GCM, self.iv)
try:
output_decrypted: bytes = cipher.decrypt(encrypted_data)
except ValueError as e:
l.fatal("Decryption failed: {}."
"\n This probably means your backup is corrupted.".format(e))
log.fatal("Decryption failed: {}."
"\n This probably means your backup is corrupted.".format(e))
raise e

# Verify the authentication tag
Expand All @@ -193,12 +195,13 @@ def decrypt(self, key: Key14, encrypted: bytes) -> bytes:
else:
cipher.verify(authentication_tag)
except ValueError as e:
l.error("Authentication tag mismatch: {}."
"\n This probably means your backup is corrupted.".format(e))
log.error("Authentication tag mismatch: {}."
"\n This probably means your backup is corrupted.".format(e))

return output_decrypted

def encrypt(self, key: Key14, props: Props, decrypted: bytes) -> bytes:
hash = md5()
file_hash = md5()
out = b""
out += self.cipher_version
out += self.key_version
Expand All @@ -209,16 +212,13 @@ def encrypt(self, key: Key14, props: Props, decrypted: bytes) -> bytes:
encrypted = cipher.encrypt(decrypted)
out += encrypted
out += cipher.digest()
hash.update(out)
out += hash.digest()
file_hash.update(out)
out += file_hash.digest()
jid = props.get_jid()
if len(jid) != 2:
l.error("The phone number end is not 2 characters long")
log.error("The phone number end is not 2 characters long")
out += "--{}".format(jid).encode()
return out



def get_iv(self) -> bytes:
return self.iv

59 changes: 30 additions & 29 deletions src/wa_crypt_tools/lib/db/db15.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@

from wa_crypt_tools.lib.props import Props

l = logging.getLogger(__name__)
log = logging.getLogger(__name__)

from wa_crypt_tools.lib.db.db import Database
from wa_crypt_tools.lib.key.key15 import Key15


class Database15(Database):
def __str__(self):
return "Database15"
Expand All @@ -28,23 +29,23 @@ def __init__(self, *, key: Key15 = None, encrypted=None, iv: bytes = None
from wa_crypt_tools.proto import backup_prefix_pb2 as prefix
from wa_crypt_tools.proto import key_type_pb2 as key_type
except ImportError as e:
l.error("Could not import the proto classes: {}".format(e))
log.error("Could not import the proto classes: {}".format(e))
if str(e).startswith("cannot import name 'builder' from 'google.protobuf.internal'"):
l.error("You need to upgrade the protobuf library to at least 3.20.0.\n"
" python -m pip install --upgrade protobuf")
log.error("You need to upgrade the protobuf library to at least 3.20.0.\n"
" python -m pip install --upgrade protobuf")
elif str(e).startswith("no module named"):
l.error("Please download them and put them in the \"proto\" sub folder.")
log.error("Please download them and put them in the \"proto\" sub folder.")
raise e
except AttributeError as e:
l.error("Could not import the proto classes: {}\n ".format(e) +
"Your protobuf library is probably too old.\n "
"Please upgrade to at least version 3.20.0 , by running:\n "
"python -m pip install --upgrade protobuf")
log.error("Could not import the proto classes: {}\n ".format(e) +
"Your protobuf library is probably too old.\n "
"Please upgrade to at least version 3.20.0 , by running:\n "
"python -m pip install --upgrade protobuf")
raise e

self.header = prefix.BackupPrefix()

l.debug("Parsing database header...")
log.debug("Parsing database header...")

try:

Expand All @@ -62,55 +63,54 @@ def __init__(self, *, key: Key15 = None, encrypted=None, iv: bytes = None
else:
self.file_hash.update(encrypted.read(1))
if not msgstore_features_flag:
l.debug("No feature table found (not a msgstore DB or very old)")
log.debug("No feature table found (not a msgstore DB or very old)")

try:

protobuf_raw = encrypted.read(protobuf_size)
self.file_hash.update(protobuf_raw)

if self.header.ParseFromString(protobuf_raw) != protobuf_size:
l.error("Protobuf message not fully read. Please report a bug.")
log.error("Protobuf message not fully read. Please report a bug.")
else:

# Checking and printing WA version and phone number
version = findall(r"\d(?:\.\d{1,3}){3}", self.header.info.app_version)
if len(version) != 1:
l.error('WhatsApp version not found')
log.error('WhatsApp version not found')
else:
l.debug("WhatsApp version: {}".format(version[0]))
log.debug("WhatsApp version: {}".format(version[0]))
if len(self.header.info.jidSuffix) != 2:
l.error("The phone number end is not 2 characters long")
l.debug("Your phone number ends with {}".format(self.header.info.jidSuffix))
log.error("The phone number end is not 2 characters long")
log.debug("Your phone number ends with {}".format(self.header.info.jidSuffix))

if len(self.header.c15_iv.IV) != 0:
# DB Header is crypt15
# if type(key) is not Key15:
# l.error("You are using a crypt14 key file with a crypt15 backup.")
if len(self.header.c15_iv.IV) != 16:
l.error("IV is not 16 bytes long but is {} bytes long".format(len(self.header.c15_iv.IV)))
log.error(
"IV is not 16 bytes long but is {} bytes long".format(len(self.header.c15_iv.IV)))
iv = self.header.c15_iv.IV

elif len(self.header.c14_cipher.IV) != 0:
raise ValueError("Crypt14 file in crypt15 constructor!")
else:
l.error("Could not parse the IV from the protobuf message. Please report a bug.")
log.error("Could not parse the IV from the protobuf message. Please report a bug.")
raise ValueError



except DecodeError as e:

l.error("Could not parse the protobuf message: {}".format(e))
log.error("Could not parse the protobuf message: {}".format(e))
raise e

except OSError as e:
l.fatal("Reading database header failed: {}".format(e))
log.fatal("Reading database header failed: {}".format(e))
raise e
else:
if iv:
if len(iv) != 16:
l.error("IV is not 16 bytes long but is {} bytes long".format(len(iv)))
log.error("IV is not 16 bytes long but is {} bytes long".format(len(iv)))
self.iv = iv
else:
self.iv = urandom(16)
Expand All @@ -129,14 +129,14 @@ def decrypt(self, key: Key15, encrypted: bytes) -> bytes:
# We are probably in a multifile backup, which does not have a checksum.
is_multifile_backup = True
else:
l.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest()))
log.debug("Checksum OK ({}). Decrypting...".format(self.file_hash.hexdigest()))

cipher = AES.new(key.get(), AES.MODE_GCM, self.iv)
try:
output_decrypted: bytes = cipher.decrypt(encrypted_data)
except ValueError as e:
l.fatal("Decryption failed: {}."
"\n This probably means your backup is corrupted.".format(e))
log.fatal("Decryption failed: {}."
"\n This probably means your backup is corrupted.".format(e))
raise e

# Verify the authentication tag
Expand All @@ -152,8 +152,8 @@ def decrypt(self, key: Key15, encrypted: bytes) -> bytes:
else:
cipher.verify(authentication_tag)
except ValueError as e:
l.error("Authentication tag mismatch: {}."
"\n This probably means your backup is corrupted.".format(e))
log.error("Authentication tag mismatch: {}."
"\n This probably means your backup is corrupted.".format(e))

return output_decrypted

Expand Down Expand Up @@ -186,5 +186,6 @@ def encrypt(self, key: Key15, props: Props, decrypted: bytes) -> bytes:
file_hash.update(authentication_tag)
out += file_hash.digest()
return out

def get_iv(self) -> bytes:
return self.iv
return self.iv
Loading

0 comments on commit 5cd2ed6

Please sign in to comment.