Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extractor support for .pkg and .zst packages #1580

Merged
merged 9 commits into from
Mar 30, 2022
41 changes: 41 additions & 0 deletions cve_bin_tool/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import re
import shutil
import sys
import tarfile
import tempfile

import zstandard
from rpmfile.cli import main as rpmextract

from cve_bin_tool.async_utils import (
Expand Down Expand Up @@ -47,6 +49,8 @@ def __init__(self, logger=None, error_mode=ErrorMode.TruncTrace):
self.extract_file_deb: {".deb", ".ipk"},
self.extract_file_cab: {".cab"},
self.extract_file_apk: {".apk"},
self.extract_file_zst: {".zst"},
self.extract_file_pkg: {".pkg"},
self.extract_file_zip: {
".exe",
".zip",
Expand Down Expand Up @@ -109,6 +113,43 @@ async def extract_file_rpm(self, filename, extraction_path):
return 1
return 0

async def extract_file_zst(self, filename, extraction_path):
"""Extract zstd compressed files"""

dctx = zstandard.ZstdDecompressor()
with ErrorHandler(mode=ErrorMode.Ignore) as e:
with open(filename, "rb") as f:
with dctx.stream_reader(f) as reader:
with tarfile.open(mode="r|", fileobj=reader) as tar:
tar.extractall(extraction_path)
tar.close()
return e.exit_code

async def extract_file_pkg(self, filename, extraction_path):
"""Extract pkg files"""
if sys.platform.startswith("linux"):
if not await aio_inpath("tar"):
with ErrorHandler(mode=self.error_mode, logger=self.logger):
raise Exception("tar is required to extract .pkg files")
else:
stdout, stderr, return_code = await aio_run_command(
["tar", "xf", filename, "-C", extraction_path]
)
if (stderr or not stdout) and return_code != 0:
return 1
return 0
else:
if not await aio_inpath("7z"):
with ErrorHandler(mode=self.error_mode, logger=self.logger):
raise Exception("7z is required to extract rpm files")
else:
stdout, stderr, _ = await aio_run_command(
["7z", "x", filename, "-o", extraction_path]
)
if stderr or not stdout:
return 1
return 0

async def extract_file_deb(self, filename, extraction_path):
"""Extract debian packages"""
if not await aio_inpath("ar"):
Expand Down