Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rustvmm_gen: Introduce rustvmm_gen #177

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added scripts/lib/__init__.py
Empty file.
129 changes: 129 additions & 0 deletions scripts/lib/kernel_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2025 © Institute of Software, CAS. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import os
import re
import tarfile
import requests
import subprocess
import tempfile

KERNEL_ORG_CDN = "https://cdn.kernel.org/pub/linux/kernel"


def check_kernel_version(version):
"""
Validate if the input kernel version exists in remote. Supports both X.Y
(namely X.Y.0 and .0 should be omitted) and X.Y.Z formats
"""
# Validate version format
if not re.match(r"^\d+\.\d+(\.\d+)?$", version):
raise ValueError("Invalid version format. Use X.Y or X.Y.Z")

main_ver = version.split(".")[0]
base_url = f"{KERNEL_ORG_CDN}/v{main_ver}.x/"
tarball = f"linux-{version}.tar.xz"

try:
# Fetch content of `base_url`
response = requests.get(base_url, timeout=15)
response.raise_for_status()

# Check for exact filename match
if tarball in response.text:
print(f"Kernel version {version} found in remote")
return

raise RuntimeError(f"Kernel version {version} not found in remote")

except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
raise RuntimeError(f"Kernel series v{main_ver}.x does not exist")

raise RuntimeError(f"HTTP error ({e.response.status_code}): {str(e)}")
except requests.exceptions.Timeout:
raise RuntimeError("Connection timeout while checking version")
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Network error: {str(e)}")


def create_temp_dir(version):
prefix = f"linux-{version}-source-"
try:
temp_dir = tempfile.TemporaryDirectory(prefix=prefix, dir="/tmp", delete=False)
return temp_dir.name
except OSError as e:
raise RuntimeError(f"Failed to create temp directory: {e}") from e


def download_kernel(version, temp_dir):
version_major = re.match(r"^(\d+)\.\d+(\.\d+)?$", version).group(1)
url = f"{KERNEL_ORG_CDN}/v{version_major}.x/linux-{version}.tar.xz"
tarball_path = os.path.join(temp_dir, f"linux-{version}.tar.xz")
print(f"Downloading {url} to {tarball_path}")

try:
with requests.get(url, stream=True) as response:
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0

with open(tarball_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = downloaded / total_size * 100
print(f"\rDownloading: {progress:.1f}%", end="")
print()
return tarball_path
except Exception as e:
raise RuntimeError(f"Download failed: {e}") from e


def extract_kernel(tarball_path, temp_dir):
print("Extracting...")
try:
with tarfile.open(tarball_path, "r:xz") as tar:
tar.extractall(path=temp_dir)
extract_path = os.path.join(
temp_dir, f"{os.path.basename(tarball_path).split('.tar')[0]}"
)
print(f"Extracted to {extract_path}")
return extract_path
except (tarfile.TarError, IOError) as e:
raise RuntimeError(f"Extraction failed: {e}") from e


def install_headers(src_dir, arch, install_path):
parent_dir = os.path.dirname(src_dir)
if install_path is None:
install_path = os.path.join(parent_dir, f"{arch}_headers")

try:
os.makedirs(install_path, exist_ok=True)

abs_install_path = os.path.abspath(install_path)
print(f"Installing to {abs_install_path}")
result = subprocess.run(
[
"make",
"-C",
f"{src_dir}",
f"ARCH={arch}",
f"INSTALL_HDR_PATH={abs_install_path}",
"headers_install",
],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
print(result.stdout)
return install_path

except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Header installation failed:\n{e.output}"
f"Temporary files kept at: {os.path.dirname(src_dir)}"
)
55 changes: 55 additions & 0 deletions scripts/lib/syscall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2025 © Institute of Software, CAS. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import subprocess
import re


def generate_syscall_table(file_path):
"""Generate syscall table from specified header file"""
try:
with open(file_path, "r") as f:
syscalls = []
pattern = re.compile(r"^#define __NR_(\w+)\s+(\d+)")

for line in f:
line = line.strip()
if line.startswith("#define __NR_"):
match = pattern.match(line)
if match:
name = match.group(1)
num = int(match.group(2))
syscalls.append((name, num))

# Sort alphabetically by syscall name
syscalls.sort(key=lambda x: x[0])
syscall_list = [f'("{name}", {num}),' for name, num in syscalls]
return " ".join(syscall_list)

except FileNotFoundError:
raise RuntimeError(f"Header file not found: {file_path}")
except Exception as e:
raise RuntimeError(f"File processing failed: {str(e)}")


def generate_rust_code(syscalls, output_path):
"""Generate Rust code and format with rustfmt"""
print(f"Generating to: {output_path}")
code = f"""use std::collections::HashMap;
pub(crate) fn make_syscall_table() -> HashMap<&'static str, i64> {{
vec![
{syscalls}
].into_iter().collect()
}}
"""
try:
with open(output_path, "w") as f:
f.write(code)

# Format with rustfmt
subprocess.run(["rustfmt", output_path], check=True)
print(f"Generation succeeded: {output_path}")
except subprocess.CalledProcessError:
raise RuntimeError("rustfmt formatting failed")
except IOError as e:
raise RuntimeError(f"File write error: {str(e)}")
101 changes: 101 additions & 0 deletions scripts/rustvmm_gen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python3
#
# Copyright 2025 © Institute of Software, CAS. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import argparse
import os
from pathlib import Path
from lib.kernel_source import (
check_kernel_version,
create_temp_dir,
download_kernel,
extract_kernel,
install_headers,
)
from lib.syscall import (
generate_syscall_table,
generate_rust_code,
)

# Map arch used in linux kernel to arch understandable for Rust
MAP_RUST_ARCH = {"arm64": "aarch64", "x86_64": "x86_64", "riscv": "riscv64"}


def prepare_command(args):
check_kernel_version(args.version)

# Create `temp_dir` under `/tmp`
temp_dir = create_temp_dir(args.version)

# Download kernel tarball from https://cdn.kernel.org/
tarball = download_kernel(args.version, temp_dir)

# Extract kernel source
src_dir = extract_kernel(tarball, temp_dir)

# Get headers of specific architecture
installed_header_path = install_headers(
src_dir=src_dir,
arch=args.arch,
install_path=args.install_path,
)

print(f"\nSuccessfully installed kernel headers to {installed_header_path}")
return src_dir


def generate_syscall_command(args):
src_dir = prepare_command(args)

# Generate syscall table
header_path = os.path.join(
os.path.dirname(src_dir), f"{args.arch}_headers/include/asm/unistd_64.h"
)
syscalls = generate_syscall_table(header_path)

# Create output directory if needed
args.output_path.mkdir(parents=True, exist_ok=True)

# Generate architecture-specific filename
output_file_path = args.output_path / f"{MAP_RUST_ARCH[args.arch]}.rs"

# Generate Rust code
generate_rust_code(syscalls, output_file_path)


def main():
parser = argparse.ArgumentParser(prog="rustvmm_gen")
subparsers = parser.add_subparsers(dest="command", required=True)
parser.add_argument("--arch", help="Target architecture (x86_64, arm64, riscv64)")
parser.add_argument("--version", help="Kernel version (e.g. 6.12.8)")
parser.add_argument(
"--install_path",
default=None,
help="Header installation directory path",
)
parser.add_argument("--keep", help="Keep temporary build files")

# Prepare subcommand
prepare_parser = subparsers.add_parser("prepare", help="Prepare kernel headers")
prepare_parser.set_defaults(func=prepare_command)

# Generate syscall subcommand
generate_syscall_parser = subparsers.add_parser(
"generate_syscall",
help="Generate syscall for `rust-vmm/seccompiler` from prepared kernel headers",
)
generate_syscall_parser.add_argument(
"--output_path",
type=Path,
default=os.getcwd(),
help="Output directory path (default: current)",
)
generate_syscall_parser.set_defaults(func=generate_syscall_command)

args = parser.parse_args()
args.func(args)


if __name__ == "__main__":
main()