From 2cbc99cfdbb9ca2e832009b99d2b8cdc8391829c Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 22 Jan 2025 11:09:25 +0100 Subject: [PATCH] add ContainerImageLoader and Filesystem --- dissect/target/filesystems/containerimage.py | 59 ++++++++++++++++++++ dissect/target/loader.py | 1 + dissect/target/loaders/containerimage.py | 39 +++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 dissect/target/filesystems/containerimage.py create mode 100644 dissect/target/loaders/containerimage.py diff --git a/dissect/target/filesystems/containerimage.py b/dissect/target/filesystems/containerimage.py new file mode 100644 index 000000000..9bc9048ce --- /dev/null +++ b/dissect/target/filesystems/containerimage.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import json +import logging +from pathlib import Path + +from dissect.target.filesystem import LayerFilesystem +from dissect.target.filesystems.tar import TarFilesystem + +log = logging.getLogger(__name__) + + +class ContainerImageFilesystem(LayerFilesystem): + """Container image filesystem implementation. + + ..code-block:: + + docker image save example:latest -o image.tar + + References: + - https://snyk.io/blog/container-image-formats/ + - https://github.com/moby/docker-image-spec/ + - https://github.com/opencontainers/image-spec/ + """ + + __type__ = "container" + + def __init__(self, path: Path, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._path = path + self.tar = TarFilesystem(path.open("rb")) + + try: + self.manifest = json.loads(self.tar.path("/manifest.json").read_text())[0] + except Exception as e: + self.manifest = None + raise ValueError(f"Unable to read manifest.json inside docker image filesystem: {str(e)}") + + self.name = self.manifest.get("RepoTags", [None])[0] + + try: + self.config = json.loads(self.tar.path(self.manifest.get("Config")).read_text()) + except Exception as e: + self.config = None + raise ValueError(f"Unable to read config inside docker image filesystem: {str(e)}") + + for layer in [self.tar.path(p) for p in self.manifest.get("Layers", [])]: + if not layer.exists(): + log.warning("Layer %s does not exist in container image", layer) + continue + + fs = TarFilesystem(layer.open("rb")) + self.append_fs_layer(fs) + + self.append_layer().mount("$fs$/container", self.tar) + + def __repr__(self) -> str: + return f"<{self.__class__.__name__} path={self._path} name={self.name}>" diff --git a/dissect/target/loader.py b/dissect/target/loader.py index 08d5e29b0..8a336c881 100644 --- a/dissect/target/loader.py +++ b/dissect/target/loader.py @@ -178,6 +178,7 @@ def open(item: Union[str, Path], *args, **kwargs) -> Loader: register("remote", "RemoteLoader") register("mqtt", "MQTTLoader") register("asdf", "AsdfLoader") +register("containerimage", "ContainerImageLoader") # Should be before TarLoader register("tar", "TarLoader") register("vmx", "VmxLoader") register("vmwarevm", "VmwarevmLoader") diff --git a/dissect/target/loaders/containerimage.py b/dissect/target/loaders/containerimage.py new file mode 100644 index 000000000..4e3ceb15d --- /dev/null +++ b/dissect/target/loaders/containerimage.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from dissect.target.filesystems.containerimage import ContainerImageFilesystem +from dissect.target.filesystems.tar import TarFilesystem +from dissect.target.helpers.fsutil import TargetPath +from dissect.target.loader import Loader +from dissect.target.loaders.tar import TarLoader +from dissect.target.target import Target + +DOCKER_ARCHIVE_IMAGE = { + "/manifest.json", + "/repositories", +} + +OCI_IMAGE = { + "/manifest.json", + "/repositories", + "/blobs", + "/oci-layout", + "/index.json", +} + + +class ContainerImageLoader(Loader): + """Load saved container images.""" + + def __init__(self, path: TargetPath, **kwargs): + super().__init__(path.resolve(), **kwargs) + + @staticmethod + def detect(path: TargetPath) -> bool: + return ( + TarLoader.detect(path) + and (root := set(map(str, TarFilesystem(path.open("rb")).path("/").iterdir()))) + and (OCI_IMAGE.issubset(root) or DOCKER_ARCHIVE_IMAGE.issubset(root)) + ) + + def map(self, target: Target) -> None: + target.filesystems.add(ContainerImageFilesystem(self.path))