From 3be9f4cdcd1195afc1ea7806f1c1f063679a42ba Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Thu, 14 Sep 2017 17:01:04 +0200 Subject: [PATCH 1/3] Add miio-extract-tokens tool for extracting tokens from sqlite databases. This is to simplify the process for token and device type extraction, and will probably later merged to the CLI tool to generate config files based on known devices. Tested to work fine on .tar files extracted from Android backups, support for Apple databases is incomplete. Related to #75. --- mirobo/extract_tokens.py | 96 ++++++++++++++++++++++++++++++++++++++++ setup.py | 3 +- 2 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 mirobo/extract_tokens.py diff --git a/mirobo/extract_tokens.py b/mirobo/extract_tokens.py new file mode 100644 index 000000000..51e235df1 --- /dev/null +++ b/mirobo/extract_tokens.py @@ -0,0 +1,96 @@ +import click +import tarfile +import tempfile +import sqlite3 +import binascii +from Crypto.Cipher import AES +from pprint import pformat as pf + +def dump_raw(dev): + raw = {k: dev[k] for k in dev.keys()} + click.echo(pf(raw)) + +def decrypt_ztoken(ztoken): + if len(ztoken) <= 32: + return ztoken + + keystring = '00000000000000000000000000000000' + key = bytes.fromhex(keystring) + cipher = AES.new(key, AES.MODE_ECB) + token = cipher.decrypt(bytes.fromhex(ztoken[:64])) + + return token + + +def read_apple(conn): + click.echo("Reading tokens from Apple DB") + c = conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';") + for dev in c.fetchall(): + token = decrypt_ztoken(dev['ZTOKEN']) + ip = dev['ZLOCALIP'] + click.echo("device at %s. token: %s" % (ip, token)) + dump_raw(dev) + raise NotImplementedError("Please report the previous output to developers") + + +def read_android(conn): + click.echo("Reading tokens from Android DB") + c = conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';") + for dev in c.fetchall(): + # dump_raw(dev) + ip = dev['localIP'] + mac = dev['mac'] + model = dev['model'] + name = dev['name'] + ssid = dev['ssid'] + token = dev['token'] + click.echo("%s (%s) at %s. token: %s (mac: %s, ssid: %s)" % (name, model, ip, token, mac, ssid)) + +def write(db, fp): + fp.open() + db.seek(0) # go to the beginning + click.echo("Saving db to %s" % fp) + fp.write(db.read()) + +def read_tokens(db): + conn = sqlite3.connect(db) + conn.row_factory = sqlite3.Row + with conn: + is_android = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None + is_apple = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None + if is_android: + read_android(conn) + elif is_apple: + read_apple(conn) + else: + click.echo("Error, unknown database type!") + + +@click.command() +@click.argument('backup') +@click.option('--write-to-disk', type=click.File('wb'), help='writes sqlite3 db to a file for debugging') +def main(backup, write_to_disk): + """Reads device information out from an sqlite3 DB. + If the given file is a .tar file, the file will be extracted + and the database automatically located (out of Android backups). + """ + if backup.endswith(".tar"): + DBFILE = "apps/com.xiaomi.smarthome/db/miio2.db" + with tarfile.open(backup) as f: + click.echo("Opened %s" % backup) + db = f.extractfile(DBFILE) + with tempfile.NamedTemporaryFile() as fp: + click.echo("Extracting to %s" % fp.name) + fp.write(db.read()) + if write_to_disk: + write(db, write_to_disk) + + read_tokens(fp.name) + else: + read_tokens(backup) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/setup.py b/setup.py index e4bbab0c9..0fc2e65a1 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,8 @@ 'mirobo=mirobo.vacuum_cli:cli', 'miplug=mirobo.plug_cli:cli', 'miceil=mirobo.ceil_cli:cli', - 'mieye=mirobo.philips_eyecare_cli:cli' + 'mieye=mirobo.philips_eyecare_cli:cli', + 'miio-extract-tokens=mirobo.extract_tokens:main' ], }, ) From 2000fabfa7d30e388eb7c5184cc3ff21f8078231 Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Thu, 14 Sep 2017 17:04:49 +0200 Subject: [PATCH 2/3] Fix linting except for overlong lines --- mirobo/extract_tokens.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mirobo/extract_tokens.py b/mirobo/extract_tokens.py index 51e235df1..ba60614cc 100644 --- a/mirobo/extract_tokens.py +++ b/mirobo/extract_tokens.py @@ -2,14 +2,15 @@ import tarfile import tempfile import sqlite3 -import binascii from Crypto.Cipher import AES from pprint import pformat as pf + def dump_raw(dev): raw = {k: dev[k] for k in dev.keys()} click.echo(pf(raw)) + def decrypt_ztoken(ztoken): if len(ztoken) <= 32: return ztoken @@ -46,12 +47,14 @@ def read_android(conn): token = dev['token'] click.echo("%s (%s) at %s. token: %s (mac: %s, ssid: %s)" % (name, model, ip, token, mac, ssid)) + def write(db, fp): fp.open() db.seek(0) # go to the beginning click.echo("Saving db to %s" % fp) fp.write(db.read()) + def read_tokens(db): conn = sqlite3.connect(db) conn.row_factory = sqlite3.Row @@ -93,4 +96,4 @@ def main(backup, write_to_disk): if __name__ == "__main__": - main() \ No newline at end of file + main() From fdfb5de605b91ec3782f567252ed6d16b859bd9f Mon Sep 17 00:00:00 2001 From: Teemu Rytilahti Date: Thu, 14 Sep 2017 17:51:27 +0200 Subject: [PATCH 3/3] Fix Apple database support * Refactor the code into a class for further use * add --dump-raw and --dump-all (prints devs without IP addrs, such as linked BT devices) --- mirobo/extract_tokens.py | 145 +++++++++++++++++++++------------------ 1 file changed, 78 insertions(+), 67 deletions(-) diff --git a/mirobo/extract_tokens.py b/mirobo/extract_tokens.py index ba60614cc..d848ddb85 100644 --- a/mirobo/extract_tokens.py +++ b/mirobo/extract_tokens.py @@ -6,79 +6,90 @@ from pprint import pformat as pf -def dump_raw(dev): - raw = {k: dev[k] for k in dev.keys()} - click.echo(pf(raw)) - - -def decrypt_ztoken(ztoken): - if len(ztoken) <= 32: - return ztoken - - keystring = '00000000000000000000000000000000' - key = bytes.fromhex(keystring) - cipher = AES.new(key, AES.MODE_ECB) - token = cipher.decrypt(bytes.fromhex(ztoken[:64])) - - return token - - -def read_apple(conn): - click.echo("Reading tokens from Apple DB") - c = conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';") - for dev in c.fetchall(): - token = decrypt_ztoken(dev['ZTOKEN']) - ip = dev['ZLOCALIP'] - click.echo("device at %s. token: %s" % (ip, token)) - dump_raw(dev) - raise NotImplementedError("Please report the previous output to developers") - - -def read_android(conn): - click.echo("Reading tokens from Android DB") - c = conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';") - for dev in c.fetchall(): - # dump_raw(dev) - ip = dev['localIP'] - mac = dev['mac'] - model = dev['model'] - name = dev['name'] - ssid = dev['ssid'] - token = dev['token'] - click.echo("%s (%s) at %s. token: %s (mac: %s, ssid: %s)" % (name, model, ip, token, mac, ssid)) - - -def write(db, fp): - fp.open() - db.seek(0) # go to the beginning - click.echo("Saving db to %s" % fp) - fp.write(db.read()) - - -def read_tokens(db): - conn = sqlite3.connect(db) - conn.row_factory = sqlite3.Row - with conn: - is_android = conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None - is_apple = conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None - if is_android: - read_android(conn) - elif is_apple: - read_apple(conn) - else: - click.echo("Error, unknown database type!") +class BackupDatabaseReader: + def __init__(self, dump_all=False, dump_raw=False): + self.dump_all = dump_all + self.dump_raw = dump_raw + + @staticmethod + def dump_raw(dev): + raw = {k: dev[k] for k in dev.keys()} + click.echo(pf(raw)) + + @staticmethod + def decrypt_ztoken(ztoken): + if len(ztoken) <= 32: + return ztoken + + keystring = '00000000000000000000000000000000' + key = bytes.fromhex(keystring) + cipher = AES.new(key, AES.MODE_ECB) + token = cipher.decrypt(bytes.fromhex(ztoken[:64])) + + return token.decode() + + def read_apple(self): + click.echo("Reading tokens from Apple DB") + c = self.conn.execute("SELECT * FROM ZDEVICE WHERE ZTOKEN IS NOT '';") + for dev in c.fetchall(): + if self.dump_raw: + BackupDatabaseReader.dump_raw(dev) + ip = dev['ZLOCALIP'] + mac = dev['ZMAC'] + model = dev['ZMODEL'] + name = dev['ZNAME'] + token = BackupDatabaseReader.decrypt_ztoken(dev['ZTOKEN']) + if ip or self.dump_all: + click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac)) + + def read_android(self): + click.echo("Reading tokens from Android DB") + c = self.conn.execute("SELECT * FROM devicerecord WHERE token IS NOT '';") + for dev in c.fetchall(): + if self.dump_raw: + BackupDatabaseReader.dump_raw(dev) + ip = dev['localIP'] + mac = dev['mac'] + model = dev['model'] + name = dev['name'] + token = dev['token'] + if ip or self.dump_all: + click.echo("%s\n\tModel: %s\n\tIP address: %s\n\tToken: %s\n\tMAC: %s" % (name, model, ip, token, mac)) + + def dump_to_file(self, fp): + fp.open() + self.db.seek(0) # go to the beginning + click.echo("Saving db to %s" % fp) + fp.write(self.db.read()) + + def read_tokens(self, db): + self.db = db + self.conn = sqlite3.connect(db) + self.conn.row_factory = sqlite3.Row + with self.conn: + is_android = self.conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='devicerecord';").fetchone() is not None + is_apple = self.conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='ZDEVICE'").fetchone() is not None + if is_android: + self.read_android() + elif is_apple: + self.read_apple() + else: + click.echo("Error, unknown database type!") @click.command() @click.argument('backup') @click.option('--write-to-disk', type=click.File('wb'), help='writes sqlite3 db to a file for debugging') -def main(backup, write_to_disk): +@click.option('--dump-all', is_flag=True, default=False, help='dump devices without ip addresses') +@click.option('--dump-raw', is_flag=True, help='dumps raw rows') +def main(backup, write_to_disk, dump_all, dump_raw): """Reads device information out from an sqlite3 DB. If the given file is a .tar file, the file will be extracted and the database automatically located (out of Android backups). """ + reader = BackupDatabaseReader(dump_all, dump_raw) if backup.endswith(".tar"): DBFILE = "apps/com.xiaomi.smarthome/db/miio2.db" with tarfile.open(backup) as f: @@ -88,11 +99,11 @@ def main(backup, write_to_disk): click.echo("Extracting to %s" % fp.name) fp.write(db.read()) if write_to_disk: - write(db, write_to_disk) + reader.dump_to_file(write_to_disk) - read_tokens(fp.name) + reader.read_tokens(fp.name) else: - read_tokens(backup) + reader.read_tokens(backup) if __name__ == "__main__":