Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Android Backups #419

Merged
merged 7 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader:
register("kape", "KapeLoader")
register("tanium", "TaniumLoader")
register("itunes", "ITunesLoader")
register("ab", "AndroidBackupLoader")
register("target", "TargetLoader")
register("log", "LogLoader")
# Disabling ResLoader because of DIS-536
Expand Down
287 changes: 287 additions & 0 deletions dissect/target/loaders/ab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import hashlib
import io
import posixpath
import shutil
import struct
from pathlib import Path
from typing import BinaryIO

try:
from Crypto.Cipher import AES

HAS_PYCRYPTODOME = True
except ImportError:
HAS_PYCRYPTODOME = False

Check warning on line 14 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L13-L14

Added lines #L13 - L14 were not covered by tests

from dissect.util.stream import AlignedStream, RelativeStream, ZlibStream

from dissect.target.exceptions import LoaderError
from dissect.target.filesystem import VirtualFilesystem
from dissect.target.filesystems.tar import TarFilesystem
from dissect.target.helpers import keychain
from dissect.target.loader import Loader
from dissect.target.plugins.os.unix.linux.android._os import AndroidPlugin
from dissect.target.target import Target

DIRECTORY_MAPPING = {
"a": "/data/app/{id}",
"f": "/data/data/{id}/files",
"db": "/data/data/{id}/databases",
"ef": "/storage/emulated/0/Android/data/{id}",
"sp": "/data/data/{id}/shared_preferences",
"r": "/data/data/{id}",
"obb": "/storage/emulated/0/Android/obb/{id}",
}


class AndroidBackupLoader(Loader):
"""Load Android backup files.

References:
- http://fileformats.archiveteam.org/wiki/Android_ADB_Backup
"""

def __init__(self, path: Path, **kwargs):
super().__init__(path)
self.ab = AndroidBackup(path.open("rb"))

if self.ab.encrypted:
for key in keychain.get_keys_for_provider("ab") + keychain.get_keys_without_provider():
if key.key_type == keychain.KeyType.PASSPHRASE:
try:
self.ab.unlock(key.value)
break
except ValueError:
continue

Check warning on line 55 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L54-L55

Added lines #L54 - L55 were not covered by tests
else:
raise LoaderError(f"Missing password for encrypted Android Backup: {self.path}")

Check warning on line 57 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L57

Added line #L57 was not covered by tests

@staticmethod
def detect(path: Path) -> bool:
if path.suffix.lower() == ".ab":
return True

if path.is_file():

Check warning on line 64 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L64

Added line #L64 was not covered by tests
# The file extension can be chosen freely so let's also test for the file magic
with path.open("rb") as fh:
if fh.read(15) == b"ANDROID BACKUP\n":
return True

Check warning on line 68 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L66-L68

Added lines #L66 - L68 were not covered by tests

return False

Check warning on line 70 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L70

Added line #L70 was not covered by tests

def map(self, target: Target) -> None:
if self.ab.compressed or self.ab.encrypted:
if self.ab.compressed and not self.ab.encrypted:
word = "compressed"

Check warning on line 75 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L75

Added line #L75 was not covered by tests
elif self.ab.encrypted and not self.ab.compressed:
word = "encrypted"

Check warning on line 77 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L77

Added line #L77 was not covered by tests
else:
word = "compressed and encrypted"

target.log.warning(
f"Backup file is {word}, consider unwrapping with "
"`python -m dissect.target.loaders.ab <path/to/backup.ab>`"
)

vfs = VirtualFilesystem(case_sensitive=False)

fs = TarFilesystem(self.ab.open())
for app in fs.path("/apps").iterdir():
for subdir in app.iterdir():
if subdir.name not in DIRECTORY_MAPPING:
continue

path = DIRECTORY_MAPPING[subdir.name].format(id=app.name)

# TODO: Remove once we move towards "directory entries"
entry = subdir.get()
entry.name = posixpath.basename(path)

vfs.map_file_entry(path, entry)

target.filesystems.add(vfs)

target.fs.mount("/", vfs)
target._os_plugin = AndroidPlugin(target)


class AndroidBackup:
def __init__(self, fh: BinaryIO):
self.fh = fh

size = fh.seek(0, io.SEEK_END)
fh.seek(0)

# Don't readline() straight away as we may be reading something other than a backup file
magic = fh.read(15)
if magic != b"ANDROID BACKUP\n":
raise ValueError("Not a valid Android Backup file")

Check warning on line 118 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L118

Added line #L118 was not covered by tests

self.version = int(fh.read(2)[:1])
self.compressed = bool(int(fh.read(2)[:1]))

self.encrypted = False
self.unlocked = True
self.encryption = fh.readline().strip().decode()

if self.encryption != "none":
self.encrypted = True
self.unlocked = False
self._user_salt = bytes.fromhex(fh.readline().strip().decode())
self._ck_salt = bytes.fromhex(fh.readline().strip().decode())
self._rounds = int(fh.readline().strip())
self._user_iv = bytes.fromhex(fh.readline().strip().decode())
self._master_key = bytes.fromhex(fh.readline().strip().decode())

self._mk = None
self._iv = None

self._data_offset = fh.tell()
self.size = size - self._data_offset

def unlock(self, password: str) -> None:
if not self.encrypted:
raise ValueError("Android Backup is not encrypted")

Check warning on line 144 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L144

Added line #L144 was not covered by tests

self._mk, self._iv = self._decrypt_mk(password)
self.unlocked = True

def _decrypt_mk(self, password: str) -> tuple[bytes, bytes]:
user_key = hashlib.pbkdf2_hmac("sha1", password.encode(), self._user_salt, self._rounds, 32)

blob = AES.new(user_key, AES.MODE_CBC, iv=self._user_iv).decrypt(self._master_key)
blob = blob[: -blob[-1]]

offset = 0
iv_len = blob[offset]
offset += 1
iv = blob[offset : offset + iv_len]

offset += iv_len
mk_len = blob[offset]
offset += 1
mk = blob[offset : offset + mk_len]

offset += mk_len
checksum_len = blob[offset]
offset += 1
checksum = blob[offset : offset + checksum_len]

ck_mk = _encode_bytes(mk) if self.version >= 2 else mk
our_checksum = hashlib.pbkdf2_hmac("sha1", ck_mk, self._ck_salt, self._rounds, 32)
if our_checksum != checksum:
# Try reverse encoding for good measure
ck_mk = mk if self.version >= 2 else _encode_bytes(mk)
our_checksum = hashlib.pbkdf2_hmac("sha1", ck_mk, self._ck_salt, self._rounds, 32)

Check warning on line 175 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L174-L175

Added lines #L174 - L175 were not covered by tests

if our_checksum != checksum:
raise ValueError("Invalid password: master key checksum does not match")

Check warning on line 178 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L178

Added line #L178 was not covered by tests

return mk, iv

def open(self) -> BinaryIO:
fh = RelativeStream(self.fh, self._data_offset)

if self.encrypted:
if not self.unlocked:
raise ValueError("Missing password for encrypted Android Backup")

Check warning on line 187 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L187

Added line #L187 was not covered by tests
fh = CipherStream(fh, self._mk, self._iv, self.size)

if self.compressed:
fh = ZlibStream(fh)

return fh


class CipherStream(AlignedStream):
"""Transparently AES-CBC decrypted stream."""

def __init__(self, fh: BinaryIO, key: bytes, iv: bytes, size: int):
self._fh = fh

self._key = key
self._iv = iv
self._cipher = None
self._cipher_offset = 0
self._reset_cipher()

super().__init__(size)

def _reset_cipher(self) -> None:
self._cipher = AES.new(self._key, AES.MODE_CBC, iv=self._iv)
self._cipher_offset = 0

def _seek_cipher(self, offset: int) -> None:
"""CBC is dependent on previous blocks so to seek the cipher, decrypt and discard to the wanted offset."""
if offset < self._cipher_offset:
self._reset_cipher()
self._fh.seek(0)

Check warning on line 218 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L217-L218

Added lines #L217 - L218 were not covered by tests

while self._cipher_offset < offset:
read_size = min(offset - self._cipher_offset, self.align)
self._cipher.decrypt(self._fh.read(read_size))
self._cipher_offset += read_size

Check warning on line 223 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L221-L223

Added lines #L221 - L223 were not covered by tests

def _read(self, offset: int, length: int) -> bytes:
self._seek_cipher(offset)

data = self._cipher.decrypt(self._fh.read(length))
if offset + length >= self.size:
# Remove padding
data = data[: -data[-1]]

self._cipher_offset += len(data)

return data


def _encode_bytes(buf: bytes) -> bytes:
# Emulate byte[] -> char[] -> utf8 byte[] casting
return struct.pack(">32h", *struct.unpack(">32b", buf)).decode("utf-16-be").encode("utf-8")


def main() -> None:
import argparse

parser = argparse.ArgumentParser(description="Android Backup file unwrapper")
parser.add_argument("path", type=Path, help="source path")
parser.add_argument("-p", "--password", help="encryption password")
parser.add_argument("-t", "--tar", action="store_true", help="write a tar file instead of a plain Android Backup")
parser.add_argument("-o", "--output", type=Path, help="output path")
args = parser.parse_args()

if not args.path.is_file():
parser.exit("source path does not exist or is not a file")

Check warning on line 254 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L254

Added line #L254 was not covered by tests

ext = ".tar" if args.tar else ".plain.ab"
if args.output is None:
output = args.path.with_suffix(ext)

Check warning on line 258 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L258

Added line #L258 was not covered by tests
elif args.output.is_dir():
output = args.output.joinpath(args.path.name).with_suffix(ext)
else:
output = args.output

Check warning on line 262 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L262

Added line #L262 was not covered by tests

if output.exists():
parser.exit(f"output path already exists: {output}")

Check warning on line 265 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L265

Added line #L265 was not covered by tests

print(f"unwrapping {args.path} -> {output}")
with args.path.open("rb") as fh:
ab = AndroidBackup(fh)

if ab.encrypted:
if not args.password:
parser.exit("missing password for encrypted Android Backup")

Check warning on line 273 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L273

Added line #L273 was not covered by tests
ab.unlock(args.password)

with ab.open() as fhab, output.open("wb") as fhout:
if not args.tar:
fhout.write(b"ANDROID BACKUP\n") # header
fhout.write(b"5\n") # version
fhout.write(b"0\n") # compressed
fhout.write(b"none\n") # encryption

shutil.copyfileobj(fhab, fhout, 1024 * 1024 * 64)


if __name__ == "__main__":
main()

Check warning on line 287 in dissect/target/loaders/ab.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/loaders/ab.py#L287

Added line #L287 was not covered by tests
39 changes: 15 additions & 24 deletions dissect/target/plugins/os/unix/linux/android/_os.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
from __future__ import annotations

from typing import Iterator, Optional, TextIO
from typing import Iterator, Optional

from dissect.target.filesystem import Filesystem
from dissect.target.helpers.record import UnixUserRecord
from dissect.target.helpers import configutil
from dissect.target.helpers.record import EmptyRecord
from dissect.target.plugin import OperatingSystem, export
from dissect.target.plugins.os.unix.linux._os import LinuxPlugin
from dissect.target.target import Target


class BuildProp:
def __init__(self, fh: TextIO):
self.props = {}

for line in fh:
line = line.strip()

if not line or line.startswith("#"):
continue

k, v = line.split("=")
self.props[k] = v


class AndroidPlugin(LinuxPlugin):
def __init__(self, target: Target):
super().__init__(target)
self.target = target
self.props = BuildProp(self.target.fs.path("/build.prop").open("rt"))

self.props = {}
if (build_prop := self.target.fs.path("/build.prop")).exists():
self.props = configutil.parse(build_prop, separator=("=",), comment_prefixes=("#",)).parsed_data

@classmethod
def detect(cls, target: Target) -> Optional[Filesystem]:
Expand All @@ -42,8 +32,8 @@
return cls(target)

@export(property=True)
def hostname(self) -> str:
return self.props.props["ro.build.host"]
def hostname(self) -> Optional[str]:
return self.props.get("ro.build.host")

@export(property=True)
def ips(self) -> list[str]:
Expand All @@ -53,11 +43,11 @@
def version(self) -> str:
full_version = "Android"

release_version = self.props.props.get("ro.build.version.release")
if release_version := self.props.props.get("ro.build.version.release"):
release_version = self.props.get("ro.build.version.release")
if release_version := self.props.get("ro.build.version.release"):
full_version += f" {release_version}"

if security_patch_version := self.props.props.get("ro.build.version.security_patch"):
if security_patch_version := self.props.get("ro.build.version.security_patch"):
full_version += f" ({security_patch_version})"

return full_version
Expand All @@ -66,5 +56,6 @@
def os(self) -> str:
return OperatingSystem.ANDROID.value

def users(self) -> Iterator[UnixUserRecord]:
raise NotImplementedError()
@export(record=EmptyRecord)
def users(self) -> Iterator[EmptyRecord]:
yield from ()

Check warning on line 61 in dissect/target/plugins/os/unix/linux/android/_os.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/unix/linux/android/_os.py#L61

Added line #L61 was not covered by tests
3 changes: 3 additions & 0 deletions tests/_data/loaders/ab/test.ab
Git LFS file not shown
Loading
Loading