From a27766176cee69f3c3a8d5418eda725ca3b763dc Mon Sep 17 00:00:00 2001 From: hugsy Date: Sun, 3 Nov 2024 11:45:36 -0800 Subject: [PATCH] checkpoint: added new remote modes, gdbserver & gdbserver-multi work --- gef.py | 194 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 122 insertions(+), 72 deletions(-) diff --git a/gef.py b/gef.py index 08bed01fd..41cb61da6 100644 --- a/gef.py +++ b/gef.py @@ -273,11 +273,13 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper -class ValidationError(Exception): pass # # Helpers # +class ValidationError(Exception): pass + +class InitializationError(Exception): pass class ObsoleteException(Exception): pass @@ -347,10 +349,10 @@ def is_alive() -> bool: return False -def calling_function() -> str | None: +def calling_function(frame: int = 3) -> str | None: """Return the name of the calling function""" try: - stack_info = traceback.extract_stack()[-3] + stack_info = traceback.extract_stack()[-frame] return stack_info.name except Exception as e: dbg(f"traceback failed with {str(e)}") @@ -3512,30 +3514,57 @@ def get_os() -> str: return gef.session.os -@lru_cache() -def is_qemu() -> bool: - if not is_remote_debug(): +def is_target_remote(conn: gdb.TargetConnection | None = None) -> bool: + "Returns True for `extended-remote` only." + _conn = conn or gdb.selected_inferior().connection + return isinstance(_conn, gdb.RemoteTargetConnection) and _conn.type == "remote" + + +def is_target_extended_remote(conn: gdb.TargetConnection | None = None) -> bool: + "Returns True for `extended-remote` only." + _conn = conn or gdb.selected_inferior().connection + return isinstance(_conn, gdb.RemoteTargetConnection) and _conn.type == "extended-remote" + + +def is_target_remote_or_extended(conn: gdb.TargetConnection | None = None) -> bool: + return is_target_remote(conn) or is_target_extended_remote(conn) + + +def is_running_under_qemu() -> bool: + "See https://www.qemu.org/docs/master/system/gdb.html " + if not is_target_remote(): return False response = gdb.execute("maintenance packet Qqemu.sstepbits", to_string=True, from_tty=False) or "" return "ENABLE=" in response -@lru_cache() -def is_qemu_usermode() -> bool: - if not is_qemu(): +def is_running_under_qemu_user() -> bool: + if not is_running_under_qemu(): return False - response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" + response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" # Use `qAttached`? return "Text=" in response -@lru_cache() -def is_qemu_system() -> bool: - if not is_qemu(): +def is_running_under_qemu_system() -> bool: + if not is_running_under_qemu(): return False + # Use "maintenance packet qqemu.PhyMemMode"? response = gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" return "received: \"\"" in response +def is_running_in_gdbserver() -> bool: + if is_running_under_qemu(): + return False + return not is_running_under_qemu() + + +def is_running_in_rr() -> bool: + if not is_running_in_gdbserver(): + return False + return os.environ.get("GDB_UNDER_RR", None) == "1" + + def get_filepath() -> str | None: """Return the local absolute path of the file currently debugged.""" if gef.session.remote: @@ -3960,6 +3989,7 @@ def is_in_x86_kernel(address: int) -> bool: return (address >> memalign) == 0xF +@deprecated("Use `is_target_remote()`") def is_remote_debug() -> bool: """"Return True is the current debugging session is running through GDB remote session.""" return gef.session.remote_initializing or gef.session.remote is not None @@ -6294,7 +6324,10 @@ def do_invoke(self, _: list[str], **kwargs: Any) -> None: # calls `is_remote_debug` which checks if `remote_initializing` is True or `.remote` is None # This prevents some spurious errors being thrown during startup gef.session.remote_initializing = True - session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary) + # session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary) + conn = gdb.selected_inferior().connection + assert isinstance(conn, gdb.RemoteTargetConnection) + session = GefRemoteSessionManager(conn) dbg(f"[remote] initializing remote session with {session.target} under {session.root}") if not session.connect(args.pid) or not session.setup(): @@ -7414,7 +7447,7 @@ def __init__(self) -> None: def do_invoke(self, _: list[str], **kwargs: Any) -> None: args : argparse.Namespace = kwargs["arguments"] - if is_qemu_system(): + if is_running_under_qemu_system(): err("Unsupported") return @@ -11312,7 +11345,7 @@ def __repr__(self) -> str: def auxiliary_vector(self) -> dict[str, int] | None: if not is_alive(): return None - if is_qemu_system(): + if is_running_under_qemu_system(): return None if not self._auxiliary_vector: auxiliary_vector = {} @@ -11431,6 +11464,9 @@ class RemoteMode(enum.IntEnum): GDBSERVER = 0 QEMU = 1 RR = 2 + GDBSERVER_MULTI = 3 + QEMU_USER = 4 + QEMU_SYSTEM = 5 def __str__(self): return self.name @@ -11440,30 +11476,48 @@ def __repr__(self): def prompt_string(self) -> str: match self: - case GefRemoteSessionManager.RemoteMode.QEMU: + case ( + GefRemoteSessionManager.RemoteMode.QEMU | + GefRemoteSessionManager.RemoteMode.QEMU_USER | + GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM + ): return Color.boldify("(qemu) ") case GefRemoteSessionManager.RemoteMode.RR: return Color.boldify("(rr) ") - case GefRemoteSessionManager.RemoteMode.GDBSERVER: + case ( + GefRemoteSessionManager.RemoteMode.GDBSERVER | + GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI + ): return Color.boldify("(remote) ") raise AttributeError("Unknown value") - def __init__(self, host: str, port: int, pid: int =-1, qemu: pathlib.Path | None = None) -> None: + @staticmethod + def init() -> "GefRemoteSessionManager.RemoteMode": + if is_running_under_qemu_system(): + return GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM + if is_running_under_qemu_user(): + return GefRemoteSessionManager.RemoteMode.QEMU_USER + if is_running_in_rr(): + return GefRemoteSessionManager.RemoteMode.RR + if is_running_in_gdbserver(): + if is_target_extended_remote(): + return GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI + return GefRemoteSessionManager.RemoteMode.GDBSERVER + raise AttributeError + + def __init__(self, conn: gdb.RemoteTargetConnection) -> None: super().__init__() - self.__host = host - self.__port = port + assert is_target_remote_or_extended() + remote_host = conn.details + assert remote_host + host, port = remote_host.split(":", 1) + self.__host = host or "localhost" + self.__port = int(port) self.__local_root_fd = tempfile.TemporaryDirectory() self.__local_root_path = pathlib.Path(self.__local_root_fd.name) - self.__qemu = qemu - if pid > 0: - self._pid = pid + self._mode = GefRemoteSessionManager.RemoteMode.init() - if self.__qemu is not None: - self._mode = GefRemoteSessionManager.RemoteMode.QEMU - elif os.environ.get("GDB_UNDER_RR", None) == "1": - self._mode = GefRemoteSessionManager.RemoteMode.RR - else: - self._mode = GefRemoteSessionManager.RemoteMode.GDBSERVER + self.setup() def close(self) -> None: self.__local_root_fd.cleanup() @@ -11475,7 +11529,11 @@ def close(self) -> None: raise def __str__(self) -> str: - return f"RemoteSession(target='{self.target}', local='{self.root}', pid={self.pid}, mode={self.mode})" + msg = f"RemoteSession(target='{self.target}', local='{self.root}', mode={self.mode}" + if self.mode == GefRemoteSessionManager.RemoteMode.GDBSERVER: + msg += f", pid={self.pid}" + msg += ")" + return msg def __repr__(self) -> str: return str(self) @@ -11561,16 +11619,18 @@ def connect(self, pid: int) -> bool: def setup(self) -> bool: # setup remote adequately depending on remote or qemu mode + dbg(f"Setting up the {self._mode} session") match self.mode: - case GefRemoteSessionManager.RemoteMode.QEMU: - dbg(f"Setting up as qemu session, target={self.__qemu}") - self.__setup_qemu() + case GefRemoteSessionManager.RemoteMode.QEMU_USER: + self.__setup_qemu_user() + case GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM: + raise Exception("TODO") case GefRemoteSessionManager.RemoteMode.RR: - dbg("Setting up as rr session") self.__setup_rr() case GefRemoteSessionManager.RemoteMode.GDBSERVER: - dbg("Setting up as remote session") self.__setup_remote() + case GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI: + pass case _: raise ValueError @@ -11580,13 +11640,13 @@ def setup(self) -> bool: reset_architecture() return True - def __setup_qemu(self) -> bool: + def __setup_qemu_user(self) -> bool: # setup emulated file in the chroot - assert self.__qemu - target = self.root / str(self.__qemu.parent).lstrip("/") + __qemu_target = pathlib.Path("foo") # TODO + target = self.root / str(__qemu_target.parent).lstrip("/") target.mkdir(parents=True, exist_ok=False) - shutil.copy2(self.__qemu, target) - self._file = self.__qemu + shutil.copy2(__qemu_target, target) + self._file = __qemu_target assert self.lfile.exists() # create a procfs @@ -11767,12 +11827,20 @@ def reset_caches(self) -> None: def target_remote_posthook(): - if gef.session.remote_initializing: - return + print(f"{is_target_remote()=}") + print(f"{is_target_remote_or_extended()=}") + print(f"{is_target_extended_remote()=}") + print(f"{is_running_under_qemu()=}") + print(f"{is_running_under_qemu_system()=}") + print(f"{is_running_under_qemu_user()=}") + print(f"{is_running_in_gdbserver()=}") + print(f"{is_running_in_rr()=}") + conn = gdb.selected_inferior().connection + if not isinstance(conn, gdb.RemoteTargetConnection): + raise TypeError("Expected type gdb.RemoteTargetConnection") + assert is_target_remote_or_extended(conn), "Target is not remote" + gef.session.remote = GefRemoteSessionManager(conn) - gef.session.remote = GefRemoteSessionManager("", 0) - if not gef.session.remote.setup(): - raise EnvironmentError(f"Failed to create a proper environment for {gef.session.remote}") if __name__ == "__main__": if sys.version_info[0] == 2: @@ -11835,32 +11903,14 @@ def target_remote_posthook(): GefTmuxSetup() - if GDB_VERSION > (9, 0): - disable_tr_overwrite_setting = "gef.disable_target_remote_overwrite" - - if not gef.config[disable_tr_overwrite_setting]: - warnmsg = ("Using `target remote` with GEF should work in most cases, " - "but use `gef-remote` if you can. You can disable the " - "overwrite of the `target remote` command by toggling " - f"`{disable_tr_overwrite_setting}` in the config.") - hook = f""" - define target hookpost-{{}} - pi target_remote_posthook() - context - pi if calling_function() != "connect": warn("{warnmsg}") - end - """ - - # Register a post-hook for `target remote` that initialize the remote session - gdb.execute(hook.format("remote")) - gdb.execute(hook.format("extended-remote")) - else: - errmsg = ("Using `target remote` does not work, use `gef-remote` " - f"instead. You can toggle `{disable_tr_overwrite_setting}` " - "if this is not desired.") - hook = f"""pi if calling_function() != "connect": err("{errmsg}")""" - gdb.execute(f"define target hook-remote\n{hook}\nend") - gdb.execute(f"define target hook-extended-remote\n{hook}\nend") + # Initialize `target *remote` post hooks + hook = """ + define target hookpost-{0} + pi target_remote_posthook() + end + """ + gdb.execute(hook.format("remote")) + gdb.execute(hook.format("extended-remote")) # restore saved breakpoints (if any) bkp_fpath = pathlib.Path(gef.config["gef.autosave_breakpoints_file"]).expanduser().absolute()