Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworking handling of config options #592

Merged
merged 8 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies = [
"cryptography >=42.0.5",
"pyyaml >=6.0",
"pyahocorasick >= 2.0.0",
"pydantic >= 2.10.0",
"pydantic-settings >= 2.7.0",
]
requires-python = ">= 3.8"

Expand Down
17 changes: 9 additions & 8 deletions src/mvt/android/modules/backup/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

import os

from rich.prompt import Prompt

from mvt.common.config import settings

MVT_ANDROID_BACKUP_PASSWORD = "MVT_ANDROID_BACKUP_PASSWORD"


Expand All @@ -16,24 +17,24 @@ def cli_load_android_backup_password(log, backup_password):

Used in MVT CLI command parsers.
"""
password_from_env = os.environ.get(MVT_ANDROID_BACKUP_PASSWORD, None)
password_from_env_or_config = settings.ANDROID_BACKUP_PASSWORD
if backup_password:
log.info(
"Your password may be visible in the process table because it "
"was supplied on the command line!"
)
if password_from_env:
if password_from_env_or_config:
log.info(
"Ignoring %s environment variable, using --backup-password argument instead",
MVT_ANDROID_BACKUP_PASSWORD,
"MVT_ANDROID_BACKUP_PASSWORD",
)
return backup_password
elif password_from_env:
elif password_from_env_or_config:
log.info(
"Using backup password from %s environment variable",
MVT_ANDROID_BACKUP_PASSWORD,
"Using backup password from %s environment variable or config file",
"MVT_ANDROID_BACKUP_PASSWORD",
)
return password_from_env
return password_from_env_or_config


def prompt_or_load_android_backup_password(log, module_options):
Expand Down
5 changes: 3 additions & 2 deletions src/mvt/common/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
generate_hashes_from_path,
get_sha256_from_file_path,
)
from mvt.common.config import settings
from mvt.common.version import MVT_VERSION


Expand Down Expand Up @@ -132,7 +133,7 @@ def _store_info(self) -> None:
if ioc_file_path and ioc_file_path not in info["ioc_files"]:
info["ioc_files"].append(ioc_file_path)

if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
if self.target_path and (settings.HASH_FILES or self.hashes):
self.generate_hashes()

info["hashes"] = self.hash_values
Expand All @@ -141,7 +142,7 @@ def _store_info(self) -> None:
with open(info_path, "w+", encoding="utf-8") as handle:
json.dump(info, handle, indent=4)

if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
if self.target_path and (settings.HASH_FILES or self.hashes):
info_hash = get_sha256_from_file_path(info_path)
self.log.info('Reference hash of the info.json file: "%s"', info_hash)

Expand Down
105 changes: 105 additions & 0 deletions src/mvt/common/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import os
import yaml
import json

from typing import Tuple, Type, Optional
from appdirs import user_config_dir
from pydantic import AnyHttpUrl, Field
from pydantic_settings import (
BaseSettings,
InitSettingsSource,
PydanticBaseSettingsSource,
SettingsConfigDict,
YamlConfigSettingsSource,
)

MVT_CONFIG_FOLDER = user_config_dir("mvt")
MVT_CONFIG_PATH = os.path.join(MVT_CONFIG_FOLDER, "config.yaml")


class MVTSettings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="MVT_",
env_nested_delimiter="_",
extra="ignore",
nested_model_default_partial_updates=True,
)
# Allow to decided if want to load environment variables
load_env: bool = Field(True, exclude=True)

# General settings
PYPI_UPDATE_URL: AnyHttpUrl = Field(
"https://pypi.org/pypi/mvt/json",
validate_default=False,
)
NETWORK_ACCESS_ALLOWED: bool = True
NETWORK_TIMEOUT: int = 15

# Command default settings, all can be specified by MVT_ prefixed environment variables too.
IOS_BACKUP_PASSWORD: Optional[str] = Field(
None, description="Default password to use to decrypt iOS backups"
)
ANDROID_BACKUP_PASSWORD: Optional[str] = Field(
None, description="Default password to use to decrypt Android backups"
)
STIX2: Optional[str] = Field(
None, description="List of directories where STIX2 files are stored"
)
VT_API_KEY: Optional[str] = Field(
None, description="API key to use for VirusTotal lookups"
)
PROFILE: bool = Field(False, description="Profile the execution of MVT modules")
HASH_FILES: bool = Field(False, description="Should MVT hash output files")

@classmethod
def settings_customise_sources(
cls,
settings_cls: Type[BaseSettings],
init_settings: InitSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
sources = (
YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH),
init_settings,
)
# Load env variables if enabled
if init_settings.init_kwargs.get("load_env", True):
sources = (env_settings,) + sources
return sources

def save_settings(
self,
) -> None:
"""
Save the current settings to a file.
"""
if not os.path.isdir(MVT_CONFIG_FOLDER):
os.makedirs(MVT_CONFIG_FOLDER)

# Dump the settings to the YAML file
model_serializable = json.loads(self.model_dump_json(exclude_defaults=True))
with open(MVT_CONFIG_PATH, "w") as config_file:
config_file.write(yaml.dump(model_serializable, default_flow_style=False))

@classmethod
def initialise(cls) -> "MVTSettings":
"""
Initialise the settings file.

We first initialise the settings (without env variable) and then persist
them to file. This way we can update the config file with the default values.

Afterwards we load the settings again, this time including the env variables.
"""
# Set invalid env prefix to avoid loading env variables.
settings = MVTSettings(load_env=False)
settings.save_settings()

# Load the settings again with any ENV variables.
settings = MVTSettings(load_env=True)
return settings


settings = MVTSettings.initialise()
7 changes: 4 additions & 3 deletions src/mvt/common/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from appdirs import user_data_dir

from .url import URL
from .config import settings

MVT_DATA_FOLDER = user_data_dir("mvt")
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
Expand Down Expand Up @@ -41,12 +42,12 @@ def _load_downloaded_indicators(self) -> None:

def _check_stix2_env_variable(self) -> None:
"""
Checks if a variable MVT_STIX2 contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
Checks if MVT_STIX2 setting or environment variable contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
"""
if "MVT_STIX2" not in os.environ:
if not settings.STIX2:
return

paths = os.environ["MVT_STIX2"].split(":")
paths = settings.STIX2.split(":")
for path in paths:
if os.path.isfile(path) and path.lower().endswith(".stix2"):
self.parse_stix2(path)
Expand Down
3 changes: 2 additions & 1 deletion src/mvt/common/updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
from .version import MVT_VERSION
from .config import settings

log = logging.getLogger(__name__)

Expand All @@ -23,7 +24,7 @@

class MVTUpdates:
def check(self) -> str:
res = requests.get("https://pypi.org/pypi/mvt/json", timeout=15)
res = requests.get(settings.PYPI_UPDATE_URL, timeout=15)
data = res.json()
latest_version = data.get("info", {}).get("version", "")

Expand Down
3 changes: 2 additions & 1 deletion src/mvt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Iterator, Union

from rich.logging import RichHandler
from mvt.common.config import settings


class CustomJSONEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -256,7 +257,7 @@ def set_verbose_logging(verbose: bool = False):

def exec_or_profile(module, globals, locals):
"""Hook for profiling MVT modules"""
if int(os.environ.get("MVT_PROFILE", False)):
if settings.PROFILE:
cProfile.runctx(module, globals, locals)
else:
exec(module, globals, locals)
4 changes: 4 additions & 0 deletions tests/common/test_indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import logging
import os


from mvt.common.config import settings
from mvt.common.indicators import Indicators
from ..utils import get_artifact_folder

Expand Down Expand Up @@ -100,6 +102,8 @@ def test_check_android_property(self, indicator_file):

def test_env_stix(self, indicator_file):
os.environ["MVT_STIX2"] = indicator_file
settings.__init__() # Reset settings

ind = Indicators(log=logging)
ind.load_indicators_files([], load_default=False)
assert ind.total_ioc_count == 9
4 changes: 4 additions & 0 deletions tests/test_check_android_androidqf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from click.testing import CliRunner

from mvt.android.cli import check_androidqf
from mvt.common.config import settings

from .utils import get_artifact_folder

Expand Down Expand Up @@ -56,10 +57,13 @@ def test_check_encrypted_backup_env(self, mocker):
)

os.environ["MVT_ANDROID_BACKUP_PASSWORD"] = TEST_BACKUP_PASSWORD
settings.__init__() # Reset settings

runner = CliRunner()
path = os.path.join(get_artifact_folder(), "androidqf_encrypted")
result = runner.invoke(check_androidqf, [path])

assert prompt_mock.call_count == 0
assert result.exit_code == 0
del os.environ["MVT_ANDROID_BACKUP_PASSWORD"]
settings.__init__() # Reset settings
4 changes: 4 additions & 0 deletions tests/test_check_android_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from click.testing import CliRunner

from mvt.android.cli import check_backup
from mvt.common.config import settings

from .utils import get_artifact_folder

Expand Down Expand Up @@ -63,10 +64,13 @@ def test_check_encrypted_backup_env(self, mocker):
)

os.environ["MVT_ANDROID_BACKUP_PASSWORD"] = TEST_BACKUP_PASSWORD
settings.__init__() # Reset settings

runner = CliRunner()
path = os.path.join(get_artifact_folder(), "androidqf_encrypted/backup.ab")
result = runner.invoke(check_backup, [path])

assert prompt_mock.call_count == 0
assert result.exit_code == 0
del os.environ["MVT_ANDROID_BACKUP_PASSWORD"]
settings.__init__() # Reset settings
Loading