Skip to content

Commit

Permalink
Python c api (#29)
Browse files Browse the repository at this point in the history
* Update injector
* Add uninject flag
* Disaable CI on push
* Bump version
  • Loading branch information
kmaork authored Jul 18, 2023
1 parent 6ef6ac3 commit 8bc176c
Show file tree
Hide file tree
Showing 15 changed files with 301 additions and 107 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ on:
type: boolean
release:
types: [ created ]
push:
pull_request:

jobs:
Expand Down
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[submodule "injector"]
path = injector
url = https://github.com/kubo/injector.git
url = https://github.com/kmaork/injector.git
4 changes: 3 additions & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Must include all injector headers and source files, so the source distribution could be installed on any platform,
# regardless of the platform where it was built.
recursive-include injector *.h
recursive-include injector/src *.c
include LICENSE
include LICENSE
2 changes: 1 addition & 1 deletion injector
2 changes: 0 additions & 2 deletions libinjector.c

This file was deleted.

4 changes: 3 additions & 1 deletion pyinjector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .api import inject, LibraryNotFoundException, InjectorError
from .api import inject, attach, PyInjectorError, LibraryNotFoundException, InjectorError

from types import ModuleType

Expand All @@ -13,3 +13,5 @@ def legacy_pyinjector_import():


legacy_pyinjector_import()

__all__ = ['inject', 'attach', 'PyInjectorError', 'LibraryNotFoundException', 'InjectorError']
5 changes: 3 additions & 2 deletions pyinjector/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ def parse_args(args: Optional[List[str]]) -> Namespace:
parser = ArgumentParser(description='Inject a dynamic library to a running process.')
parser.add_argument('pid', type=int, help='pid of the process to inject the library into')
parser.add_argument('library_path', type=str.encode, help='path of the library to inject')
parser.add_argument('-u', '--uninject', action='store_true', help='Whether to uninject the library after injection')
return parser.parse_args(args)


def main(args: Optional[List[str]] = None) -> None:
parsed_args = parse_args(args)
try:
handle = inject(parsed_args.pid, parsed_args.library_path)
handle = inject(parsed_args.pid, parsed_args.library_path, parsed_args.uninject)
except PyInjectorError as e:
print("pyinjector failed to inject: {}".format(e), file=sys.stderr)
print(f"pyinjector failed to inject: {e}", file=sys.stderr)
else:
print(f"Handle: {handle}")

Expand Down
113 changes: 43 additions & 70 deletions pyinjector/api.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
from __future__ import annotations
import os
from importlib.util import find_spec
from ctypes import CDLL, Structure, POINTER, c_int32, byref, c_char_p, c_void_p, pointer
from typing import AnyStr, Callable, Any, Mapping, Type, Optional, Tuple
from contextlib import contextmanager
from typing import AnyStr, Optional
from sys import platform

libinjector_path = find_spec('.libinjector', __package__).origin
libinjector = CDLL(libinjector_path)

injector_t = type('injector_t', (Structure,), {})
injector_pointer_t = POINTER(injector_t)
pid_t = c_int32

libinjector.injector_attach.argtypes = POINTER(injector_pointer_t), pid_t
libinjector.injector_attach.restype = c_int32
libinjector.injector_inject.argtypes = injector_pointer_t, c_char_p, POINTER(c_void_p)
libinjector.injector_inject.restype = c_int32
libinjector.injector_detach.argtypes = injector_pointer_t,
libinjector.injector_detach.restype = c_int32
libinjector.injector_error.argtypes = ()
libinjector.injector_error.restype = c_char_p
from .injector import Injector, InjectorException


class PyInjectorError(Exception):
Expand All @@ -34,10 +20,10 @@ def __str__(self):


class InjectorError(PyInjectorError):
def __init__(self, func_name: str, ret_val: int, error_str: Optional[bytes]):
def __init__(self, func_name: str, ret_val: int, error_str: Optional[str]):
self.func_name = func_name
self.ret_val = ret_val
self.error_str = error_str.decode()
self.error_str = error_str

def _get_extra_explanation(self):
return None
Expand All @@ -46,8 +32,8 @@ def __str__(self):
extra = self._get_extra_explanation()
explanation = \
'see error code definition in injector/include/injector.h' if self.error_str is None else \
(self.error_str if extra is None else '{}\n{}'.format(self.error_str, extra))
return '{} returned {}: {}'.format(self.func_name, self.ret_val, explanation)
(self.error_str if extra is None else f'{self.error_str}\n{extra}')
return f'Injector failed with {self.ret_val} calling {self.func_name}: {explanation}'


class LinuxInjectorPermissionError(InjectorError):
Expand All @@ -66,59 +52,46 @@ class MacUnknownInjectorError(InjectorError):
def _get_extra_explanation(self):
issue_link = "https://github.com/kmaork/pyinjector/issues/26"
return (
"""Mac restricts injection for security reasons. Please report this error in the issue:
{}""".format(issue_link)
f"Mac restricts injection for security reasons. Please report this error in the issue:\n{issue_link}"
if os.geteuid() == 0 else
"""Mac restricts injection for security reasons. Please try rerunning as root.
f"""Mac restricts injection for security reasons. Please try rerunning as root.
If you need to inject without root permissions, please report here:
{}""".format(issue_link)
{issue_link}"""
)


def call_c_func(func: Callable[..., int], *args: Any,
exception_map: Mapping[Tuple[int, str], Type[InjectorError]] = None) -> None:
ret = func(*args)
if ret != 0:
exception_map = {} if exception_map is None else exception_map
exception_cls = exception_map.get((ret, platform), InjectorError)
raise exception_cls(func.__name__, ret, libinjector.injector_error())


class Injector:
def __init__(self, injector_p: injector_pointer_t):
self.injector_p = injector_p

@classmethod
def attach(cls, pid: int) -> 'Injector':
assert isinstance(pid, int)
injector_p = injector_pointer_t()
call_c_func(libinjector.injector_attach, byref(injector_p), pid,
exception_map={(-8, 'linux'): LinuxInjectorPermissionError,
(-1, 'darwin'): MacUnknownInjectorError})
return cls(injector_p)

def inject(self, library_path: AnyStr) -> int:
if isinstance(library_path, str):
library_path = library_path.encode()
assert isinstance(library_path, bytes)
assert os.path.isfile(library_path), f'Library not found at "{library_path.decode()}"'
handle = c_void_p()
call_c_func(libinjector.injector_inject, self.injector_p, library_path, pointer(handle))
return handle.value

def detach(self) -> None:
call_c_func(libinjector.injector_detach, self.injector_p)


def inject(pid: int, library_path: AnyStr) -> int:
@contextmanager
def attach(pid: int):
exception_map = {(-8, 'linux'): LinuxInjectorPermissionError,
(-1, 'darwin'): MacUnknownInjectorError}
injector = Injector()
try:
injector.attach(pid)
try:
yield injector
finally:
injector.detach()
except InjectorException as e:
func_name, ret_val, error_str = e.args
exception_cls = exception_map.get((ret_val, platform), InjectorError)
raise exception_cls(func_name, ret_val, error_str) from e


def inject(pid: int, library_path: AnyStr, uninject: bool = False) -> int:
"""
Inject the shared library at library_path to the process (or thread) with the given pid.
Return the handle to the loaded library.
If uninject is True, the library will be unloaded after injection.
Return the handle to the injected library.
"""
if not os.path.isfile(library_path):
raise LibraryNotFoundException(library_path)
injector = Injector.attach(pid)
try:
return injector.inject(library_path)
finally:
injector.detach()
if isinstance(library_path, str):
encoded_library_path = library_path.encode()
else:
encoded_library_path = library_path
assert isinstance(encoded_library_path, bytes)
if not os.path.isfile(encoded_library_path):
raise LibraryNotFoundException(encoded_library_path)
with attach(pid) as injector:
handle = injector.inject(encoded_library_path)
if uninject:
injector.uninject(handle)
return handle
176 changes: 176 additions & 0 deletions pyinjector/injector.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#include <Python.h>
#include <stddef.h>
#include "injector.h"
#include <structmember.h>

static PyObject *InjectorException;

typedef struct {
PyObject_HEAD
injector_t *injector;
} Injector;

void Injector_raise(char* func_name, int result)
{
PyObject* error_args = Py_BuildValue("sis", func_name, result, injector_error());
PyErr_SetObject(InjectorException, error_args);
Py_DECREF(error_args);
}

static PyObject *Injector_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
Injector *self = (Injector *)type->tp_alloc(type, 0);
if (self == NULL) {
return NULL;
}
self->injector = NULL;
return (PyObject *)self;
}

static PyObject *Injector_attach(Injector *self, PyObject *args)
{
injector_pid_t pid;
int result;

if (!PyArg_ParseTuple(args, "i", &pid)) {
return NULL;
}

result = injector_attach(&(self->injector), pid);
if (result != INJERR_SUCCESS) {
Injector_raise("injector_attach", result);
return NULL;
}

Py_RETURN_NONE;
}

static PyObject *Injector_inject(Injector *self, PyObject *args)
{
Py_buffer path_buffer;
void *handle;
int result;

if (!PyArg_ParseTuple(args, "y*", &path_buffer)) {
return NULL;
}

result = injector_inject(self->injector, path_buffer.buf, &handle);
PyBuffer_Release(&path_buffer);
if (result != INJERR_SUCCESS) {
Injector_raise("injector_inject", result);
return NULL;
}

return PyLong_FromVoidPtr(handle);
}

#if defined(__APPLE__) || defined(__linux)
static PyObject *Injector_call(Injector *self, PyObject *args)
{
void *handle;
const char *name;
int result;

if (!PyArg_ParseTuple(args, "Ks", &handle, &name)) {
return NULL;
}

result = injector_call(self->injector, handle, name);
if (result != INJERR_SUCCESS) {
Injector_raise("injector_call", result);
return NULL;
}

Py_RETURN_NONE;
}
#endif

static PyObject *Injector_uninject(Injector *self, PyObject *args)
{
void *handle;
int result;

if (!PyArg_ParseTuple(args, "K", &handle)) {
return NULL;
}

result = injector_uninject(self->injector, handle);
if (result != INJERR_SUCCESS) {
Injector_raise("injector_uninject", result);
return NULL;
}

Py_RETURN_NONE;
}

static PyObject *Injector_detach(Injector *self, PyObject *args)
{
int result = injector_detach(self->injector);
if (result != INJERR_SUCCESS) {
Injector_raise("injector_detach", result);
return NULL;
}
// injector_detach frees the injector
self->injector = NULL;
Py_RETURN_NONE;
}

static PyMethodDef Injector_methods[] = {
{"attach", (PyCFunction)Injector_attach, METH_VARARGS, "Attach the injector to a process."},
{"inject", (PyCFunction)Injector_inject, METH_VARARGS, "Inject a shared library into the process."},
#if defined(__APPLE__) || defined(__linux)
{"call", (PyCFunction)Injector_call, METH_VARARGS, NULL},
#endif
{"uninject", (PyCFunction)Injector_uninject, METH_VARARGS, "Uninject a previously injected library."},
{"detach", (PyCFunction)Injector_detach, METH_NOARGS, "Detach the injector from the process."},
{NULL} /* Sentinel */
};

static PyTypeObject InjectorType = {
PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "injector.Injector",
.tp_doc = "Low level wrapper for injector functions",
.tp_basicsize = sizeof(Injector),
.tp_itemsize = 0,
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
.tp_new = Injector_new,
.tp_methods = Injector_methods,
};

static PyModuleDef injectormodule = {
PyModuleDef_HEAD_INIT,
.m_name = "injector",
.m_doc = "Python wrapper for the injector library.",
.m_size = -1,
};

PyMODINIT_FUNC PyInit_injector(void)
{
PyObject* m;

if (PyType_Ready(&InjectorType) < 0)
return NULL;

m = PyModule_Create(&injectormodule);
if (m == NULL)
return NULL;

Py_INCREF(&InjectorType);
if (PyModule_AddObject(m, "Injector", (PyObject *)&InjectorType) < 0) {
Py_DECREF(&InjectorType);
Py_DECREF(m);
return NULL;
}

InjectorException = PyErr_NewException("injector.InjectorException", NULL, NULL);
Py_INCREF(InjectorException);
if (PyModule_AddObject(m, "InjectorException", InjectorException) < 0) {
Py_DECREF(InjectorException);
Py_DECREF(&InjectorType);
Py_DECREF(m);
return NULL;
}

return m;
}
16 changes: 16 additions & 0 deletions pyinjector/injector.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any, Optional, Tuple
from sys import platform


class Injector:
def __init__(self) -> None: ...
def attach(self, pid: int) -> None: ...
def inject(self, path: bytes) -> int: ...
def uninject(self, handle: int) -> None: ...
def detach(self) -> None: ...
if platform == "linux" or platform == "darwin":
def call(self, handle: Any, name: str) -> None: ...


class InjectorException(Exception):
args: Tuple[str, int, Optional[str]]
Loading

0 comments on commit 8bc176c

Please sign in to comment.