-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Update Carbon Black loader to latest CBC API (#244)
Co-authored-by: Schamper <1254028+Schamper@users.noreply.github.com>
- Loading branch information
Showing
9 changed files
with
453 additions
and
144 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,122 +1,151 @@ | ||
from __future__ import annotations | ||
|
||
import stat | ||
from datetime import datetime, timezone | ||
from enum import IntEnum | ||
from typing import Any, BinaryIO, Iterator | ||
|
||
from cbapi.live_response_api import LiveResponseError | ||
from cbc_sdk.live_response_api import LiveResponseError, LiveResponseSession | ||
from dissect.util import ts | ||
|
||
from dissect.target.exceptions import FileNotFoundError, NotADirectoryError | ||
from dissect.target.filesystem import Filesystem, FilesystemEntry | ||
from dissect.target.helpers import fsutil | ||
|
||
EPOCH = datetime(1970, 1, 1) | ||
CB_TIMEFORMAT = "%Y-%m-%dT%H:%M:%S%fZ" | ||
|
||
|
||
class OS(IntEnum): | ||
WINDOWS = 1 | ||
LINUX = 2 | ||
MAC = 4 | ||
|
||
|
||
class CbFilesystem(Filesystem): | ||
__fstype__ = "cb" | ||
|
||
def __init__(self, cb, sensor, session, prefix, *args, **kwargs): | ||
super().__init__(None, *args, **kwargs) | ||
self.cb = cb | ||
self.sensor = sensor | ||
def __init__(self, session: LiveResponseSession, prefix: str, *args, **kwargs): | ||
self.session = session | ||
self.prefix = prefix | ||
self.prefix = prefix.lower() | ||
|
||
if self.session.os_type == OS.WINDOWS: | ||
alt_separator = "\\" | ||
case_sensitive = False | ||
else: | ||
alt_separator = "" | ||
case_sensitive = True | ||
|
||
super().__init__(alt_separator=alt_separator, case_sensitive=case_sensitive, *args, **kwargs) | ||
|
||
@staticmethod | ||
def _detect(fh): | ||
def detect(fh: BinaryIO) -> bool: | ||
raise TypeError("Detect is not allowed on CbFilesystem class") | ||
|
||
def get(self, path): | ||
path = path.strip("/") | ||
|
||
if self.session.os_type == 1: | ||
cbpath = path.replace("/", "\\") | ||
else: | ||
cbpath = path | ||
def get(self, path: str) -> CbFilesystemEntry: | ||
"""Returns a CbFilesystemEntry object corresponding to the given path.""" | ||
cbpath = fsutil.normalize(path, alt_separator=self.alt_separator).strip("/") | ||
if self.session.os_type == OS.WINDOWS: | ||
cbpath = cbpath.replace("/", "\\") | ||
|
||
if self.prefix and not cbpath.lower().startswith(self.prefix.lower()): | ||
cbpath = self.prefix + cbpath | ||
cbpath = self.prefix + cbpath | ||
|
||
try: | ||
res = self.session.list_directory(cbpath) | ||
if len(res) == 1: | ||
entry = res[0] | ||
else: | ||
# Root entries return the full dirlisting, make up our own entry | ||
if cbpath == self.prefix: | ||
# Root entries behave funky, so make up our own entry | ||
entry = { | ||
"filename": "\\", | ||
"filename": self.prefix, | ||
"attributes": ["DIRECTORY"], | ||
"last_access_time": 0, | ||
"last_write_time": 0, | ||
"last_access_time": "1970-01-01T00:00:00Z", | ||
"last_write_time": "1970-01-01T00:00:00Z", | ||
"create_time": "1970-01-01T00:00:00Z", | ||
"size": 0, | ||
} | ||
else: | ||
res = self.session.list_directory(cbpath) | ||
if len(res) != 1: | ||
raise FileNotFoundError(path) | ||
|
||
entry = res[0] | ||
|
||
return CbFilesystemEntry(self, cbpath, entry) | ||
return CbFilesystemEntry(self, path, entry, cbpath) | ||
except LiveResponseError: | ||
raise FileNotFoundError(path) | ||
|
||
|
||
class CbFilesystemEntry(FilesystemEntry): | ||
def get(self, path): | ||
fullpath = self.fs.session.path_join(self.path, path) | ||
return self.fs.get(fullpath) | ||
|
||
def open(self): | ||
return self.fs.session.get_raw_file(self.path) | ||
|
||
def iterdir(self): | ||
for f in self.scandir(): | ||
yield f.name | ||
|
||
def scandir(self): | ||
def __init__(self, fs: Filesystem, path: str, entry: Any, cbpath: str) -> None: | ||
super().__init__(fs, path, entry) | ||
self.cbpath = cbpath | ||
|
||
def get(self, path: str) -> CbFilesystemEntry: | ||
"""Get a filesystem entry relative from the current one.""" | ||
full_path = fsutil.join(self.path, path) | ||
return self.fs.get(full_path) | ||
|
||
def open(self) -> BinaryIO: | ||
"""Returns file handle (file-like object).""" | ||
return self.fs.session.get_raw_file(self.cbpath) | ||
|
||
def iterdir(self) -> Iterator[str]: | ||
"""List the directory contents of a directory. Returns a generator of strings.""" | ||
for entry in self.scandir(): | ||
yield entry.name | ||
|
||
def scandir(self) -> Iterator[CbFilesystemEntry]: | ||
"""List the directory contents of this directory. Returns a generator of filesystem entries.""" | ||
if not self.is_dir(): | ||
raise NotADirectoryError(f"'{self.path}' is not a directory") | ||
|
||
if self.fs.session.os_type == 1: | ||
stripped_path = self.path.rstrip("\\") | ||
path = f"{stripped_path}\\" | ||
else: | ||
path = f"{self.path.rstrip('/')}/" | ||
|
||
for f in self.fs.session.list_directory(path): | ||
if f["filename"] in (".", ".."): | ||
seperator = self.fs.alt_separator or "/" | ||
for entry in self.fs.session.list_directory(self.cbpath + seperator): | ||
filename = entry["filename"] | ||
if filename in (".", ".."): | ||
continue | ||
|
||
yield CbFilesystemEntry(self.fs, self.fs.session.path_join(path, f["filename"]), f) | ||
path = fsutil.join(self.path, filename, alt_separator=self.fs.alt_separator) | ||
cbpath = seperator.join([self.cbpath, filename]) | ||
yield CbFilesystemEntry(self.fs, path, entry, cbpath) | ||
|
||
def is_dir(self, follow_symlinks: bool = True) -> bool: | ||
"""Return whether this entry is a directory.""" | ||
return "DIRECTORY" in self.entry["attributes"] | ||
|
||
def is_file(self, follow_symlinks: bool = True) -> bool: | ||
return "ARCHIVE" in self.entry["attributes"] | ||
"""Return whether this entry is a file.""" | ||
return not self.is_dir() | ||
|
||
def is_symlink(self) -> bool: | ||
"""Return whether this entry is a link.""" | ||
return False | ||
|
||
def stat(self, follow_symlinks: bool = True) -> fsutil.stat_result: | ||
"""Return the stat information of this entry.""" | ||
return self.lstat() | ||
|
||
def lstat(self) -> fsutil.stat_result: | ||
"""Return the stat information of the given path, without resolving links.""" | ||
mode = stat.S_IFDIR if self.is_dir() else stat.S_IFREG | ||
|
||
# mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime | ||
atime = ts.to_unix(_parse_ts(self.entry["last_access_time"])) | ||
mtime = ts.to_unix(_parse_ts(self.entry["last_write_time"])) | ||
ctime = ts.to_unix(_parse_ts(self.entry["create_time"])) | ||
|
||
# ['mode', 'addr', 'dev', 'nlink', 'uid', 'gid', 'size', 'atime', 'mtime', 'ctime'] | ||
st_info = [ | ||
mode | 0o755, | ||
fsutil.generate_addr(self.path, alt_separator=self.fs.alt_separator), | ||
fsutil.generate_addr(self.cbpath), | ||
id(self.fs), | ||
0, | ||
0, | ||
0, | ||
self.entry["size"], | ||
self.entry["last_access_time"], | ||
self.entry["last_write_time"], | ||
self.entry["last_write_time"], | ||
atime, | ||
mtime, | ||
ctime, | ||
] | ||
return fsutil.stat_result(st_info) | ||
|
||
def readlink(self): | ||
raise NotImplementedError() | ||
|
||
def readlink_ext(self): | ||
raise NotImplementedError() | ||
|
||
def attr(self): | ||
raise TypeError(f"attr is not allowed on CbFilesystemEntry: {self.path}") | ||
|
||
def lattr(self): | ||
raise TypeError(f"lattr is not allowed on CbFilesystemEntry: {self.path}") | ||
def _parse_ts(ts: str) -> datetime: | ||
return datetime.strptime(ts, CB_TIMEFORMAT).replace(tzinfo=timezone.utc) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.