Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Support for WhatsApp Business (iPhone only) #60

Merged
merged 12 commits into from
Sep 22, 2023
26 changes: 21 additions & 5 deletions Whatsapp_Chat_Exporter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import string
import glob
from Whatsapp_Chat_Exporter import extract_exported, extract_iphone
from Whatsapp_Chat_Exporter import extract, extract_iphone_media
from Whatsapp_Chat_Exporter import extract, extract_iphone_media, extract_iphone_media_smb
from Whatsapp_Chat_Exporter.data_model import ChatStore
from Whatsapp_Chat_Exporter.utility import Crypt, check_update, import_from_json
from argparse import ArgumentParser, SUPPRESS
Expand Down Expand Up @@ -177,6 +177,13 @@ def main():
action='store_true',
help="Import JSON file and convert to HTML output"
)
parser.add_argument(
"--smb",
dest="smb",
default=False,
action='store_true',
help="Use Whatsapp Business default files (iphone only)"
)
args = parser.parse_args()

# Check for updates
Expand Down Expand Up @@ -265,14 +272,23 @@ def main():
vcard = extract_iphone.vcard
create_html = extract.create_html
if args.media is None:
args.media = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
if args.smb:
args.media = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
else:
args.media = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared"
if args.backup is not None:
if not os.path.isdir(args.media):
extract_iphone_media.extract_media(args.backup)
if args.smb:
extract_iphone_media_smb.extract_media(args.backup)
else:
extract_iphone_media.extract_media(args.backup)
else:
print("WhatsApp directory already exists, skipping WhatsApp file extraction.")
if args.db is None:
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
if args.smb:
msg_db = "724bd3b98b18518b455a87c1f3ac3a0d189c4466"
else:
msg_db = "7c7fba66680ef796b916b067077cc246adacf01d"
else:
msg_db = args.db
if args.wa is None:
Expand All @@ -290,7 +306,7 @@ def main():
db.row_factory = sqlite3.Row
messages(db, data, args.media)
media(db, data, args.media)
vcard(db, data)
vcard(db, data, args.media)
KnugiHK marked this conversation as resolved.
Show resolved Hide resolved
if args.android:
extract.calls(db, data)
if not args.no_html:
Expand Down
10 changes: 5 additions & 5 deletions Whatsapp_Chat_Exporter/extract_iphone.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def media(db, data, media_folder):
f"Processing media...({total_row_number}/{total_row_number})", end="\r")


def vcard(db, data):
def vcard(db, data, media_folder):
c = db.cursor()
c.execute("""SELECT DISTINCT ZWAVCARDMENTION.ZMEDIAITEM,
ZWAMEDIAITEM.ZMESSAGE,
Expand All @@ -260,13 +260,13 @@ def vcard(db, data):
contents = c.fetchall()
total_row_number = len(contents)
print(f"\nProcessing vCards...(0/{total_row_number})", end="\r")
base = "AppDomainGroup-group.net.whatsapp.WhatsApp.shared/Message/vCards"
if not os.path.isdir(base):
Path(base).mkdir(parents=True, exist_ok=True)
path = f'{media_folder}/Message/vCards'
if not os.path.isdir(path):
Path(path).mkdir(parents=True, exist_ok=True)
for index, content in enumerate(contents):
file_name = "".join(x for x in content["ZVCARDNAME"] if x.isalnum())
file_name = file_name.encode('utf-8')[:230].decode('utf-8', 'ignore')
file_path = os.path.join(base, f"{file_name}.vcf")
file_path = os.path.join(path, f"{file_name}.vcf")
if not os.path.isfile(file_path):
with open(file_path, "w", encoding="utf-8") as f:
f.write(content["ZVCARDSTRING"])
Expand Down
126 changes: 126 additions & 0 deletions Whatsapp_Chat_Exporter/extract_iphone_media_smb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/python3

import shutil
import sqlite3
import os
import time
import getpass
import threading
try:
from iphone_backup_decrypt import EncryptedBackup, RelativePath
from iphone_backup_decrypt import FailedToDecryptError, Domain
except ModuleNotFoundError:
support_encrypted = False
else:
support_encrypted = True


def extract_encrypted(base_dir, password):
backup = EncryptedBackup(backup_directory=base_dir, passphrase=password, cleanup=False, check_same_thread=False)
print("Decrypting WhatsApp database...")
try:
backup.extract_file(relative_path=RelativePath.WHATSAPP_MESSAGES,
output_filename="724bd3b98b18518b455a87c1f3ac3a0d189c4466")
backup.extract_file(relative_path=RelativePath.WHATSAPP_CONTACTS,
output_filename="d7246a707f51ddf8b17ee2dddabd9e0a4da5c552")
except FailedToDecryptError:
print("Failed to decrypt backup: incorrect password?")
exit()
extract_thread = threading.Thread(
target=backup.extract_files_by_domain,
args=(Domain.WHATSAPP, Domain.WHATSAPP)
)
extract_thread.daemon = True
extract_thread.start()
dot = 0
while extract_thread.is_alive():
print(f"Decrypting and extracting files{'.' * dot}{' ' * (3 - dot)}", end="\r")
if dot < 3:
dot += 1
time.sleep(0.5)
else:
dot = 0
time.sleep(0.4)
print(f"All required files decrypted and extracted.", end="\n")
extract_thread.handled = True
return backup


def is_encrypted(base_dir):
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as f:
c = f.cursor()
try:
c.execute("""SELECT count()
FROM Files
""")
except sqlite3.OperationalError as e:
raise e # These error cannot be used to determine if the backup is encrypted
except sqlite3.DatabaseError:
return True
else:
return False


def extract_media(base_dir):
if is_encrypted(base_dir):
if not support_encrypted:
print("You don't have the dependencies to handle encrypted backup.")
print("Read more on how to deal with encrypted backup:")
print("https://github.com/KnugiHK/Whatsapp-Chat-Exporter/blob/main/README.md#usage")
return False
print("Encryption detected on the backup!")
password = getpass.getpass("Enter the password for the backup:")
extract_encrypted(base_dir, password)
else:
wts_db = os.path.join(base_dir, "72/724bd3b98b18518b455a87c1f3ac3a0d189c4466")
contact_db = os.path.join(base_dir, "d7/d7246a707f51ddf8b17ee2dddabd9e0a4da5c552")
if not os.path.isfile(wts_db):
print("WhatsApp database not found.")
exit()
else:
shutil.copyfile(wts_db, "724bd3b98b18518b455a87c1f3ac3a0d189c4466")
if not os.path.isfile(contact_db):
print("Contact database not found.")
exit()
else:
shutil.copyfile(contact_db, "d7246a707f51ddf8b17ee2dddabd9e0a4da5c552")
_wts_id = "AppDomainGroup-group.net.whatsapp.WhatsAppSMB.shared"
with sqlite3.connect(os.path.join(base_dir, "Manifest.db")) as manifest:
manifest.row_factory = sqlite3.Row
c = manifest.cursor()
c.execute(
f"""SELECT count()
FROM Files
WHERE domain = '{_wts_id}'"""
)
total_row_number = c.fetchone()[0]
print(f"Extracting WhatsApp files...(0/{total_row_number})", end="\r")
c.execute(f"""SELECT fileID,
relativePath,
flags,
ROW_NUMBER() OVER(ORDER BY relativePath) AS _index
FROM Files
WHERE domain = '{_wts_id}'
ORDER BY relativePath""")
if not os.path.isdir(_wts_id):
os.mkdir(_wts_id)
row = c.fetchone()
while row is not None:
if row["relativePath"] == "":
row = c.fetchone()
continue
destination = os.path.join(_wts_id, row["relativePath"])
hashes = row["fileID"]
folder = hashes[:2]
flags = row["flags"]
if flags == 2:
try:
os.mkdir(destination)
except FileExistsError:
pass
elif flags == 1:
shutil.copyfile(os.path.join(base_dir, folder, hashes), destination)
if row["_index"] % 100 == 0:
print(f"Extracting WhatsApp files...({row['_index']}/{total_row_number})", end="\r")
row = c.fetchone()
print(f"Extracting WhatsApp files...({total_row_number}/{total_row_number})", end="\n")