diff --git a/mirobo/android_backup.py b/mirobo/android_backup.py new file mode 100755 index 000000000..d32bb16b9 --- /dev/null +++ b/mirobo/android_backup.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# This is a fork from original work by BlueC0re +# https://github.com/bluec0re/android-backup-tools +# License: GPLv3 +import tarfile +import zlib +import enum +import io +import pickle +import os +import binascii + +try: + from Crypto.Cipher import AES + from Crypto.Protocol.KDF import PBKDF2 + from Crypto import Random +except ImportError: + AES = None + + +class CompressionType(enum.IntEnum): + NONE = 0 + ZLIB = 1 + + +class EncryptionType(enum.Enum): + NONE = 'none' + AES256 = 'AES-256' + + +class AndroidBackup: + def __init__(self, fname=None): + if fname: + self.open(fname) + + def open(self, fname, mode='rb'): + self.fp = open(fname, mode) + + def close(self): + self.fp.close() + + def parse(self): + self.fp.seek(0) + magic = self.fp.readline() + assert magic == b'ANDROID BACKUP\n' + self.version = int(self.fp.readline().strip()) + self.compression = CompressionType(int(self.fp.readline().strip())) + self.encryption = EncryptionType(self.fp.readline().strip().decode()) + + def is_encrypted(self): + return self.encryption == EncryptionType.AES256 + + def _decrypt(self, enc, password): + if AES is None: + raise ImportError("PyCrypto required") + + user_salt, enc = enc.split(b'\n', 1) + user_salt = binascii.a2b_hex(user_salt) + ck_salt, enc = enc.split(b'\n', 1) + ck_salt = binascii.a2b_hex(ck_salt) + rounds, enc = enc.split(b'\n', 1) + rounds = int(rounds) + iv, enc = enc.split(b'\n', 1) + iv = binascii.a2b_hex(iv) + master_key, enc = enc.split(b'\n', 1) + master_key = binascii.a2b_hex(master_key) + + user_key = PBKDF2(password, user_salt, dkLen=256//8, count=rounds) + cipher = AES.new(user_key, + mode=AES.MODE_CBC, + IV=iv) + + master_key = list(cipher.decrypt(master_key)) + l = master_key.pop(0) + master_iv = bytes(master_key[:l]) + master_key = master_key[l:] + l = master_key.pop(0) + mk = bytes(master_key[:l]) + master_key = master_key[l:] + l = master_key.pop(0) + master_ck = bytes(master_key[:l]) + + # gen checksum + + # double encode utf8 + utf8mk = self.encode_utf8(mk) + calc_ck = PBKDF2(utf8mk, ck_salt, dkLen=256//8, count=rounds) + assert calc_ck == master_ck + + cipher = AES.new(mk, + mode=AES.MODE_CBC, + IV=master_iv) + + dec = cipher.decrypt(enc) + pad = dec[-1] + + return dec[:-pad] + + @staticmethod + def encode_utf8(mk): + utf8mk = mk.decode('raw_unicode_escape') + utf8mk = list(utf8mk) + for i in range(len(utf8mk)): + c = ord(utf8mk[i]) + # fix java encoding (add 0xFF00 to non ascii chars) + if 0x7f < c < 0x100: + c += 0xff00 + utf8mk[i] = chr(c) + return ''.join(utf8mk).encode('utf-8') + + def _encrypt(self, dec, password): + if AES is None: + raise ImportError("PyCrypto required") + + master_key = Random.get_random_bytes(32) + master_salt = Random.get_random_bytes(64) + user_salt = Random.get_random_bytes(64) + master_iv = Random.get_random_bytes(16) + user_iv = Random.get_random_bytes(16) + rounds = 10000 + + l = len(dec) + pad = 16 - (l % 16) + dec += bytes([pad] * pad) + cipher = AES.new(master_key, IV=master_iv, mode=AES.MODE_CBC) + enc = cipher.encrypt(dec) + + master_ck = PBKDF2(self.encode_utf8(master_key), + master_salt, dkLen=256//8, count=rounds) + + user_key = PBKDF2(password, + user_salt, dkLen=256//8, count=rounds) + + master_dec = b"\x10" + master_iv + b"\x20" + master_key + b"\x20" + master_ck + l = len(master_dec) + pad = 16 - (l % 16) + master_dec += bytes([pad] * pad) + cipher = AES.new(user_key, IV=user_iv, mode=AES.MODE_CBC) + master_enc = cipher.encrypt(master_dec) + + enc = binascii.b2a_hex(user_salt).upper() + b"\n" + \ + binascii.b2a_hex(master_salt).upper() + b"\n" + \ + str(rounds).encode() + b"\n" + \ + binascii.b2a_hex(user_iv).upper() + b"\n" + \ + binascii.b2a_hex(master_enc).upper() + b"\n" + enc + + return enc + + def read_data(self, password): + """Reads from the file and returns a TarFile object.""" + data = self.fp.read() + + if self.encryption == EncryptionType.AES256: + if password is None: + raise Exception("Password need to be provided to extract encrypted archives") + data = self._decrypt(data, password) + + if self.compression == CompressionType.ZLIB: + data = zlib.decompress(data, zlib.MAX_WBITS) + + tar = tarfile.TarFile(fileobj=io.BytesIO(data)) + return tar + + def unpack(self, target_dir=None, password=None): + tar = self.read_data(password) + + members = tar.getmembers() + + if target_dir is None: + target_dir = os.path.basename(self.fp.name) + '_unpacked' + pickle_fname = os.path.basename(self.fp.name) + '.pickle' + if not os.path.exists(target_dir): + os.mkdir(target_dir) + + tar.extractall(path=target_dir) + + with open(pickle_fname, 'wb') as fp: + pickle.dump(members, fp) + + def list(self, password=None): + tar = self.read_tar(password) + return tar.list() + + def pack(self, fname, password=None): + target_dir = os.path.basename(fname) + '_unpacked' + pickle_fname = os.path.basename(fname) + '.pickle' + + data = io.BytesIO() + tar = tarfile.TarFile(name=fname, + fileobj=data, + mode='w', + format=tarfile.PAX_FORMAT) + + with open(pickle_fname, 'rb') as fp: + members = pickle.load(fp) + + os.chdir(target_dir) + for member in members: + if member.isreg(): + tar.addfile(member, open(member.name, 'rb')) + else: + tar.addfile(member) + + tar.close() + + data.seek(0) + if self.compression == CompressionType.ZLIB: + data = zlib.compress(data.read()) + if self.encryption == EncryptionType.AES256: + data = self._encrypt(data, password) + + with open(fname, 'wb') as fp: + fp.write(b'ANDROID BACKUP\n') + fp.write('{}\n'.format(self.version).encode()) + fp.write('{:d}\n'.format(self.compression).encode()) + fp.write('{}\n'.format(self.encryption.value).encode()) + + fp.write(data) + + def __exit__(self, *args, **kwargs): + self.close() + + def __enter__(self): + self.parse() + return self diff --git a/mirobo/extract_tokens.py b/mirobo/extract_tokens.py index d848ddb85..f917f82a5 100644 --- a/mirobo/extract_tokens.py +++ b/mirobo/extract_tokens.py @@ -1,20 +1,33 @@ +import logging import click -import tarfile import tempfile import sqlite3 from Crypto.Cipher import AES from pprint import pformat as pf +import attr +from .android_backup import AndroidBackup + +logging.basicConfig(level=logging.INFO) +_LOGGER = logging.getLogger(__name__) + + +@attr.s +class DeviceConfig: + name = attr.ib() + mac = attr.ib() + ip = attr.ib() + token = attr.ib() + model = attr.ib() class BackupDatabaseReader: - def __init__(self, dump_all=False, dump_raw=False): - self.dump_all = dump_all + def __init__(self, dump_raw=False): self.dump_raw = dump_raw @staticmethod def dump_raw(dev): raw = {k: dev[k] for k in dev.keys()} - click.echo(pf(raw)) + _LOGGER.info(pf(raw)) @staticmethod def decrypt_ztoken(ztoken): @@ -29,7 +42,7 @@ def decrypt_ztoken(ztoken): return token.decode() def read_apple(self): - click.echo("Reading tokens from Apple DB") + _LOGGER.info("Reading tokens from Apple DB") c = self.conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';") for dev in c.fetchall(): if self.dump_raw: @@ -39,11 +52,12 @@ def read_apple(self): model = dev['ZMODEL'] name = dev['ZNAME'] token = BackupDatabaseReader.decrypt_ztoken(dev['ZTOKEN']) - if ip or self.dump_all: - click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac)) + + config = DeviceConfig(name=name, mac=mac, ip=ip, model=model, token=token) + yield config def read_android(self): - click.echo("Reading tokens from Android DB") + _LOGGER.info("Reading tokens from Android DB") c = self.conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';") for dev in c.fetchall(): if self.dump_raw: @@ -53,18 +67,16 @@ def read_android(self): model = dev['model'] name = dev['name'] token = dev['token'] - if ip or self.dump_all: - click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac)) - def dump_to_file(self, fp): - fp.open() - self.db.seek(0) # go to the beginning - click.echo("Saving db to %s" % fp) - fp.write(self.db.read()) + config = DeviceConfig(name=name, ip=ip, mac=mac, + model=model, token=token) + yield config def read_tokens(self, db): self.db = db + _LOGGER.info("Reading database from %s" % db) self.conn = sqlite3.connect(db) + self.conn.row_factory = sqlite3.Row with self.conn: is_android = self.conn.execute( @@ -72,38 +84,60 @@ def read_tokens(self, db): is_apple = self.conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None if is_android: - self.read_android() + yield from self.read_android() elif is_apple: - self.read_apple() + yield from self.read_apple() else: - click.echo("Error, unknown database type!") + _LOGGER.error("Error, unknown database type!") @click.command() @click.argument('backup') -@click.option('--write-to-disk', type=click.File('wb'), help='writes sqlite3 db to a file for debugging') -@click.option('--dump-all', is_flag=True, default=False, help='dump devices without ip addresses') +@click.option('--write-to-disk', type=click.File('wb'), + help='writes sqlite3 db to a file for debugging') +@click.option('--password', type=str, + help='password if the android database is encrypted') +@click.option('--dump-all', is_flag=True, default=False, + help='dump devices without ip addresses') @click.option('--dump-raw', is_flag=True, help='dumps raw rows') -def main(backup, write_to_disk, dump_all, dump_raw): +def main(backup, write_to_disk, password, dump_all, dump_raw): """Reads device information out from an sqlite3 DB. - If the given file is a .tar file, the file will be extracted - and the database automatically located (out of Android backups). + If the given file is an Android backup (.ab), the database + will be extracted automatically. + If the given file is an iOS backup, the tokens will be + extracted (and decrypted if needed) automatically. """ - reader = BackupDatabaseReader(dump_all, dump_raw) - if backup.endswith(".tar"): + + reader = BackupDatabaseReader(dump_raw) + if backup.endswith(".ab"): DBFILE = "apps/com.xiaomi.smarthome/db/miio2.db" - with tarfile.open(backup) as f: - click.echo("Opened %s" % backup) - db = f.extractfile(DBFILE) - with tempfile.NamedTemporaryFile() as fp: - click.echo("Extracting to %s" % fp.name) + with AndroidBackup(backup) as f: + tar = f.read_data(password) + try: + db = tar.extractfile(DBFILE) + except KeyError as ex: + click.echo("Unable to extract the database file %s: %s" % (DBFILE, ex)) + return + if write_to_disk: + file = write_to_disk + else: + file = tempfile.NamedTemporaryFile() + with file as fp: + click.echo("Saving database to %s" % fp.name) fp.write(db.read()) - if write_to_disk: - reader.dump_to_file(write_to_disk) - reader.read_tokens(fp.name) + devices = list(reader.read_tokens(fp.name)) else: - reader.read_tokens(backup) + devices = list(reader.read_tokens(backup)) + + for dev in devices: + if dev.ip or dump_all: + click.echo("%s\n" + "\tModel: %s\n" + "\tIP address: %s\n" + "\tToken: %s\n" + "\tMAC: %s" % (dev.name, dev.model, + dev.ip, dev.token, dev.mac)) if __name__ == "__main__":