Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Android Backups #419

Merged
merged 7 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader:
register("kape", "KapeLoader")
register("tanium", "TaniumLoader")
register("itunes", "ITunesLoader")
register("ab", "AndroidBackupLoader")
register("target", "TargetLoader")
register("log", "LogLoader")
# Disabling ResLoader because of DIS-536
Expand Down
285 changes: 285 additions & 0 deletions dissect/target/loaders/ab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
import hashlib
import io
import posixpath
import shutil
import struct
from pathlib import Path
from typing import BinaryIO

try:
from Crypto.Cipher import AES

HAS_PYCRYPTODOME = True
except ImportError:
HAS_PYCRYPTODOME = False

Check warning on line 14 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L13-L14

Added lines #L13 - L14 were not covered by tests

from dissect.util.stream import AlignedStream, RelativeStream, ZlibStream

from dissect.target.exceptions import LoaderError
from dissect.target.filesystem import VirtualFilesystem
from dissect.target.filesystems.tar import TarFilesystem
from dissect.target.helpers import keychain
from dissect.target.loader import Loader
from dissect.target.plugins.os.unix.linux.android._os import AndroidPlugin
from dissect.target.target import Target

DIRECTORY_MAPPING = {
"a": "/data/app/{id}",
"f": "/data/data/{id}/files",
"db": "/data/data/{id}/databases",
"ef": "/storage/emulated/0/Android/data/{id}",
"sp": "/data/data/{id}/shared_preferences",
"r": "/data/data/{id}",
"obb": "/storage/emulated/0/Android/obb/{id}",
}


class AndroidBackupLoader(Loader):
"""Load Android backup files.

References:
- http://fileformats.archiveteam.org/wiki/Android_ADB_Backup
"""

def __init__(self, path: Path, **kwargs):
super().__init__(path)
self.ab = AndroidBackup(path.open("rb"))

if self.ab.encrypted:
for key in keychain.get_keys_for_provider("ab") + keychain.get_keys_without_provider():
if key.key_type == keychain.KeyType.PASSPHRASE:
try:
self.ab.unlock(key.value)
break
except ValueError:
continue

Check warning on line 55 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L54-L55

Added lines #L54 - L55 were not covered by tests
else:
raise LoaderError(f"Missing password for encrypted Android Backup: {self.path}")

Check warning on line 57 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L57

Added line #L57 was not covered by tests

@staticmethod
def detect(path: Path) -> bool:
if path.suffix.lower() == ".ab":
return True

if path.is_file():

Check warning on line 64 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L64

Added line #L64 was not covered by tests
# The file extension can be chosen freely so let's also test for the file magic
with path.open("rb") as fh:
if fh.read(15) == b"ANDROID BACKUP\n":
return True

Check warning on line 68 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L66-L68

Added lines #L66 - L68 were not covered by tests

return False

Check warning on line 70 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L70

Added line #L70 was not covered by tests

def map(self, target: Target) -> None:
if self.ab.compressed or self.ab.encrypted:
if self.ab.compressed and not self.ab.encrypted:
word = "compressed"

Check warning on line 75 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L75

Added line #L75 was not covered by tests
elif self.ab.encrypted and not self.ab.compressed:
word = "encrypted"

Check warning on line 77 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L77

Added line #L77 was not covered by tests
else:
word = "compressed and encrypted"

target.log.warning(
f"Backup file is {word}, consider unwrapping with "
"`python -m dissect.target.loaders.ab <path/to/backup.ab>`"
)

vfs = VirtualFilesystem(case_sensitive=False)

fs = TarFilesystem(self.ab.open())
for app in fs.path("/apps").iterdir():
for subdir in app.iterdir():
if subdir.name not in DIRECTORY_MAPPING:
continue

path = DIRECTORY_MAPPING[subdir.name].format(id=app.name)

# TODO: Remove once we move towards "directory entries"
entry = subdir.get()
entry.name = posixpath.basename(path)

vfs.map_file_entry(path, entry)

target.filesystems.add(vfs)
target._os_plugin = AndroidPlugin.create(target, vfs)


class AndroidBackup:
def __init__(self, fh: BinaryIO):
self.fh = fh

size = fh.seek(0, io.SEEK_END)
fh.seek(0)

# Don't readline() straight away as we may be reading something other than a backup file
magic = fh.read(15)
if magic != b"ANDROID BACKUP\n":
raise ValueError("Not a valid Android Backup file")

Check warning on line 116 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L116

Added line #L116 was not covered by tests

self.version = int(fh.read(2)[:1])
self.compressed = bool(int(fh.read(2)[:1]))

self.encrypted = False
self.unlocked = True
self.encryption = fh.readline().strip().decode()

if self.encryption != "none":
self.encrypted = True
self.unlocked = False
self._user_salt = bytes.fromhex(fh.readline().strip().decode())
self._ck_salt = bytes.fromhex(fh.readline().strip().decode())
self._rounds = int(fh.readline().strip())
self._user_iv = bytes.fromhex(fh.readline().strip().decode())
self._master_key = bytes.fromhex(fh.readline().strip().decode())

self._mk = None
self._iv = None

self._data_offset = fh.tell()
self.size = size - self._data_offset

def unlock(self, password: str) -> None:
if not self.encrypted:
raise ValueError("Android Backup is not encrypted")

Check warning on line 142 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L142

Added line #L142 was not covered by tests

self._mk, self._iv = self._decrypt_mk(password)
self.unlocked = True

def _decrypt_mk(self, password: str) -> tuple[bytes, bytes]:
user_key = hashlib.pbkdf2_hmac("sha1", password.encode(), self._user_salt, self._rounds, 32)

blob = AES.new(user_key, AES.MODE_CBC, iv=self._user_iv).decrypt(self._master_key)
blob = blob[: -blob[-1]]

offset = 0
iv_len = blob[offset]
offset += 1
iv = blob[offset : offset + iv_len]

offset += iv_len
mk_len = blob[offset]
offset += 1
mk = blob[offset : offset + mk_len]

offset += mk_len
checksum_len = blob[offset]
offset += 1
checksum = blob[offset : offset + checksum_len]

ck_mk = _encode_bytes(mk) if self.version >= 2 else mk
our_checksum = hashlib.pbkdf2_hmac("sha1", ck_mk, self._ck_salt, self._rounds, 32)
if our_checksum != checksum:
# Try reverse encoding for good measure
ck_mk = mk if self.version >= 2 else _encode_bytes(mk)
our_checksum = hashlib.pbkdf2_hmac("sha1", ck_mk, self._ck_salt, self._rounds, 32)

Check warning on line 173 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L172-L173

Added lines #L172 - L173 were not covered by tests

if our_checksum != checksum:
raise ValueError("Invalid password: master key checksum does not match")

Check warning on line 176 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L176

Added line #L176 was not covered by tests

return mk, iv

def open(self) -> BinaryIO:
fh = RelativeStream(self.fh, self._data_offset)

if self.encrypted:
if not self.unlocked:
raise ValueError("Missing password for encrypted Android Backup")

Check warning on line 185 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L185

Added line #L185 was not covered by tests
fh = CipherStream(fh, self._mk, self._iv, self.size)

if self.compressed:
fh = ZlibStream(fh)

return fh


class CipherStream(AlignedStream):
"""Transparently AES-CBC decrypted stream."""

def __init__(self, fh: BinaryIO, key: bytes, iv: bytes, size: int):
self._fh = fh

self._key = key
self._iv = iv
self._cipher = None
self._cipher_offset = 0
self._reset_cipher()

super().__init__(size)

def _reset_cipher(self) -> None:
self._cipher = AES.new(self._key, AES.MODE_CBC, iv=self._iv)
self._cipher_offset = 0

def _seek_cipher(self, offset: int) -> None:
"""CBC is dependent on previous blocks so to seek the cipher, decrypt and discard to the wanted offset."""
if offset < self._cipher_offset:
self._reset_cipher()
self._fh.seek(0)

Check warning on line 216 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L215-L216

Added lines #L215 - L216 were not covered by tests

while self._cipher_offset < offset:
read_size = min(offset - self._cipher_offset, self.align)
self._cipher.decrypt(self._fh.read(read_size))
self._cipher_offset += read_size

Check warning on line 221 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L219-L221

Added lines #L219 - L221 were not covered by tests

def _read(self, offset: int, length: int) -> bytes:
self._seek_cipher(offset)

data = self._cipher.decrypt(self._fh.read(length))
if offset + length >= self.size:
# Remove padding
data = data[: -data[-1]]

self._cipher_offset += len(data)

return data


def _encode_bytes(buf: bytes) -> bytes:
# Emulate byte[] -> char[] -> utf8 byte[] casting
return struct.pack(">32h", *struct.unpack(">32b", buf)).decode("utf-16-be").encode("utf-8")


def main() -> None:
import argparse

parser = argparse.ArgumentParser(description="Android Backup file unwrapper")
parser.add_argument("path", type=Path, help="source path")
parser.add_argument("-p", "--password", help="encryption password")
parser.add_argument("-t", "--tar", action="store_true", help="write a tar file instead of a plain Android Backup")
parser.add_argument("-o", "--output", type=Path, help="output path")
args = parser.parse_args()

if not args.path.is_file():
parser.exit("source path does not exist or is not a file")

Check warning on line 252 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L252

Added line #L252 was not covered by tests

ext = ".tar" if args.tar else ".plain.ab"
if args.output is None:
output = args.path.with_suffix(ext)

Check warning on line 256 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L256

Added line #L256 was not covered by tests
elif args.output.is_dir():
output = args.output.joinpath(args.path.name).with_suffix(ext)
else:
output = args.output

Check warning on line 260 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L260

Added line #L260 was not covered by tests

if output.exists():
parser.exit(f"output path already exists: {output}")

Check warning on line 263 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L263

Added line #L263 was not covered by tests

print(f"unwrapping {args.path} -> {output}")
with args.path.open("rb") as fh:
ab = AndroidBackup(fh)

if ab.encrypted:
if not args.password:
parser.exit("missing password for encrypted Android Backup")

Check warning on line 271 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L271

Added line #L271 was not covered by tests
ab.unlock(args.password)

with ab.open() as fhab, output.open("wb") as fhout:
if not args.tar:
fhout.write(b"ANDROID BACKUP\n") # header
fhout.write(b"5\n") # version
fhout.write(b"0\n") # compressed
fhout.write(b"none\n") # encryption

shutil.copyfileobj(fhab, fhout, 1024 * 1024 * 64)


if __name__ == "__main__":
main()

Check warning on line 285 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L285

Added line #L285 was not covered by tests
39 changes: 15 additions & 24 deletions dissect/target/plugins/os/unix/linux/android/_os.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
from __future__ import annotations

from typing import Iterator, Optional, TextIO
from typing import Iterator, Optional

from dissect.target.filesystem import Filesystem
from dissect.target.helpers.record import UnixUserRecord
from dissect.target.helpers import configutil
from dissect.target.helpers.record import EmptyRecord
from dissect.target.plugin import OperatingSystem, export
from dissect.target.plugins.os.unix.linux._os import LinuxPlugin
from dissect.target.target import Target


class BuildProp:
def __init__(self, fh: TextIO):
self.props = {}

for line in fh:
line = line.strip()

if not line or line.startswith("#"):
continue

k, v = line.split("=")
self.props[k] = v


class AndroidPlugin(LinuxPlugin):
def __init__(self, target: Target):
super().__init__(target)
self.target = target
self.props = BuildProp(self.target.fs.path("/build.prop").open("rt"))

self.props = {}
if (build_prop := self.target.fs.path("/build.prop")).exists():
self.props = configutil.parse(build_prop, separator=("=",), comment_prefixes=("#",)).parsed_data

@classmethod
def detect(cls, target: Target) -> Optional[Filesystem]:
Expand All @@ -42,8 +32,8 @@
return cls(target)

@export(property=True)
def hostname(self) -> str:
return self.props.props["ro.build.host"]
def hostname(self) -> Optional[str]:
return self.props.get("ro.build.host")

@export(property=True)
def ips(self) -> list[str]:
Expand All @@ -53,11 +43,11 @@
def version(self) -> str:
full_version = "Android"

release_version = self.props.props.get("ro.build.version.release")
if release_version := self.props.props.get("ro.build.version.release"):
release_version = self.props.get("ro.build.version.release")
if release_version := self.props.get("ro.build.version.release"):
full_version += f" {release_version}"

if security_patch_version := self.props.props.get("ro.build.version.security_patch"):
if security_patch_version := self.props.get("ro.build.version.security_patch"):
full_version += f" ({security_patch_version})"

return full_version
Expand All @@ -66,5 +56,6 @@
def os(self) -> str:
return OperatingSystem.ANDROID.value

def users(self) -> Iterator[UnixUserRecord]:
raise NotImplementedError()
@export(record=EmptyRecord)
def users(self) -> Iterator[EmptyRecord]:
yield from ()

Check warning on line 61 in dissect/target/plugins/os/unix/linux/android/_os.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/android/_os.py#L61

Added line #L61 was not covered by tests
3 changes: 3 additions & 0 deletions tests/_data/loaders/ab/test.ab
Git LFS file not shown
Loading