Skip to content

Commit

Permalink
Policy Generator tool, first pass (#365)
Browse files Browse the repository at this point in the history
* add policy generator

* add unit tests for outcome values and outcome groups

* update requirements.txt

* add unit tests

* add unit tests

* add docs

* add docs

* add docs

* rename DSIO->DSOI

* fix type hints

* add unit test for dp groups

* integrate policy generator with csv_analyzer

* rename nav items
  • Loading branch information
ahouseholder authored Nov 9, 2023
1 parent 5f22632 commit 621ce0f
Show file tree
Hide file tree
Showing 17 changed files with 1,214 additions and 86 deletions.
5 changes: 5 additions & 0 deletions docs/reference/code/outcomes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Outcome Values and Outcome Groups

::: ssvc.outcomes.base

::: ssvc.outcomes.groups
9 changes: 9 additions & 0 deletions docs/reference/code/policy_generator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# SSVC Policy Generator Tool

The SSVC Policy Generator is a Python object that generates an SSVC decision
policy (a decision tree) from a set of input parameters.

It is intended to be used as a library, for example within a Jupyter notebook.


::: ssvc.policy_generator
4 changes: 3 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ nav:
- Technical Impact: 'reference/decision_points/technical_impact.md'
- Value Density: 'reference/decision_points/value_density.md'
- Code:
analyze_csv: 'reference/code/analyze_csv.md'
CSV Analyzer: 'reference/code/analyze_csv.md'
Policy Generator: 'reference/code/policy_generator.md'
Outcomes: 'reference/code/outcomes.md'
- Calculator: 'ssvc-calc/index.html'
- About:
- Intro: 'about/index.md'
Expand Down
7 changes: 4 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mkdocstrings
mkdocstrings-python
mkdocs-print-site-plugin
dataclasses-json
pandas
scikit-learn
jsonschema
pandas~=2.1.2
scikit-learn~=1.3.2
jsonschema~=4.19.2
networkx~=3.1
28 changes: 25 additions & 3 deletions src/ssvc/_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@
author: adh
created_at: 9/20/23 4:51 PM
"""
# Copyright (c) 2023 Carnegie Mellon University and Contributors.
# - see Contributors.md for a full list of Contributors
# - see ContributionInstructions.md for information on how you can Contribute to this project
# Stakeholder Specific Vulnerability Categorization (SSVC) is
# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed
# with this Software or contact permission@sei.cmu.edu for full terms.
# Created, in part, with funding and support from the United States Government
# (see Acknowledgments file). This program may include and/or can make use of
# certain third party source code, object code, documentation and other files
# (“Third Party Software”). See LICENSE.md for more details.
# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the
# U.S. Patent and Trademark Office by Carnegie Mellon University

from dataclasses import dataclass, field
from typing import Optional

Expand Down Expand Up @@ -44,6 +57,18 @@ def exclude_if_none(value):
return value is None


@dataclass_json
@dataclass(kw_only=True)
class _Commented:
"""
Mixin class for commented SSVC objects.
"""

_comment: Optional[str] = field(
default=None, metadata=config(exclude=exclude_if_none)
)


@dataclass_json
@dataclass(kw_only=True)
class _Base:
Expand All @@ -53,9 +78,6 @@ class _Base:

name: str
description: str
_comment: Optional[str] = field(
default=None, metadata=config(exclude=exclude_if_none)
)


def main():
Expand Down
92 changes: 75 additions & 17 deletions src/ssvc/csv_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@
Higher values imply more important features.
"""

# Copyright (c) 2023 Carnegie Mellon University and Contributors.
# - see Contributors.md for a full list of Contributors
# - see ContributionInstructions.md for information on how you can Contribute to this project
# Stakeholder Specific Vulnerability Categorization (SSVC) is
# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed
# with this Software or contact permission@sei.cmu.edu for full terms.
# Created, in part, with funding and support from the United States Government
# (see Acknowledgments file). This program may include and/or can make use of
# certain third party source code, object code, documentation and other files
# (“Third Party Software”). See LICENSE.md for more details.
# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the
# U.S. Patent and Trademark Office by Carnegie Mellon University

import argparse
import re
import sys
Expand Down Expand Up @@ -97,6 +110,7 @@ def _drop_col_feat_imp(
model_clone.random_state = random_state
# training and scoring the benchmark model
model_clone.fit(X_train, y_train)

benchmark_score = model_clone.score(X_train, y_train)
# list for storing feature importances
importances = []
Expand Down Expand Up @@ -191,20 +205,41 @@ def _parse_args(args) -> argparse.Namespace:
def main():
args = _parse_args(sys.argv[1:])

csvfile = args.csvfile
# read csv
df = pd.read_csv(args.csvfile)
df = _clean_table(df)
df = pd.read_csv(csvfile)

if args.permutation:
imp = permute_feature_importance(df, args.outcol)
print(f"Feature Permutation Importance for {df.columns}")
else:
imp = drop_col_feature_importance(df, args.outcol)
print(f"Drop Column Feature Importance for {df.columns}")

print(imp)


def _create_dt_classifier(
df: pd.DataFrame, target: str, permute: bool = False
) -> (pd.DataFrame, pd.DataFrame):
"""
Compute feature importance two different ways for a dataframe
Args:
df: the dataframe to analyze
target: the name of the target column to analyze against
permute: use permutation importance instead of drop column importance
Returns:
a tuple of (the cleaned dataframe, the feature importance dataframe)
"""

df = _clean_table(df)
# check for target column
target = args.outcol
if target not in df.columns:
print(
f"Column '{target}' not found in {list(df.columns)}.\nPlease specify --outcol=<col> and try again."
)
exit(1)
raise KeyError(f"Column '{target}' not found in {list(df.columns)}")

X, y = _split_data(df, target)

# turn features into ordinals
# this assumes that every column is an ordinal label
# and that the ordinals are sorted in ascending order
Expand All @@ -216,19 +251,42 @@ def main():
mapper = {v: k for (k, v) in codes}
X[newcol] = X[c].replace(mapper)
X2 = X[cols]

# construct tree
dt = DecisionTreeClassifier(random_state=99, criterion="entropy")

if args.permutation:
imp = _perm_feat_imp(dt, X2, y)
print(f"Feature Permutation Importance for {args.csvfile}")
else:
# drop columns and re-run
imp = _drop_col_feat_imp(dt, X2, y)
print(f"Drop Column Feature Importance for {args.csvfile}")
return dt, X2, y

print(imp)

def drop_col_feature_importance(df: pd.DataFrame, target: str) -> pd.DataFrame:
"""
Compute feature importance using drop column feature importance
Args:
df: the dataframe to analyze
target: the name of the target column to analyze against
Returns:
a dataframe of feature importances
"""
dt, X2, y = _create_dt_classifier(df, target)
imp = _drop_col_feat_imp(dt, X2, y)
return imp


def permute_feature_importance(df: pd.DataFrame, target: str) -> pd.DataFrame:
"""
Compute feature importance using permutation feature importance
Args:
df: the dataframe to analyze
target: the name of the target column to analyze against
Returns:
a dataframe of feature importances
"""
dt, X2, y = _create_dt_classifier(df, target)
imp = _perm_feat_imp(dt, X2, y)
return imp


if __name__ == "__main__":
Expand Down
115 changes: 64 additions & 51 deletions src/ssvc/decision_points/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@
author: adh
created_at: 9/20/23 10:07 AM
"""
# Copyright (c) 2023 Carnegie Mellon University and Contributors.
# - see Contributors.md for a full list of Contributors
# - see ContributionInstructions.md for information on how you can Contribute to this project
# Stakeholder Specific Vulnerability Categorization (SSVC) is
# licensed under a MIT (SEI)-style license, please see LICENSE.md distributed
# with this Software or contact permission@sei.cmu.edu for full terms.
# Created, in part, with funding and support from the United States Government
# (see Acknowledgments file). This program may include and/or can make use of
# certain third party source code, object code, documentation and other files
# (“Third Party Software”). See LICENSE.md for more details.
# Carnegie Mellon®, CERT® and CERT Coordination Center® are registered in the
# U.S. Patent and Trademark Office by Carnegie Mellon University

import logging
from dataclasses import dataclass, field
from typing import ClassVar, Dict, Tuple
from dataclasses import dataclass
from typing import Iterable

from dataclasses_json import config, dataclass_json
from dataclasses_json import dataclass_json

from ssvc._mixins import _Base, _Keyed, _Namespaced, _Versioned
from ssvc._mixins import _Base, _Commented, _Keyed, _Namespaced, _Versioned

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class _DecisionPoints:
"""
A collection of SSVC decision points.
"""

registry: ClassVar[Dict[str, "SsvcDecisionPoint"]] = {}

def __iter__(self):
return iter(self.registry.values())


REGISTERED_DECISION_POINTS = _DecisionPoints()
REGISTERED_DECISION_POINTS = []


@dataclass_json
Expand All @@ -43,63 +43,76 @@ class SsvcDecisionPointValue(_Base, _Keyed):

@dataclass_json
@dataclass(kw_only=True)
class SsvcDecisionPoint(_Base, _Keyed, _Versioned, _Namespaced):
class SsvcDecisionPoint(
_Base,
_Keyed,
_Versioned,
_Namespaced,
_Commented,
):
"""
Models a single decision point as a list of values.
"""

values: Tuple[SsvcDecisionPointValue]
values: Iterable[SsvcDecisionPointValue] = ()

# this is only for our own use in Python land, exclude it from serialization
_fullname: str = field(
init=False, repr=False, default=None, metadata=config(exclude=lambda x: True)
)
def __iter__(self):
"""
Allow iteration over the decision points in the group.
"""
return iter(self.values)

def __post_init__(self):
self._fullname = f"{self.namespace} {self.name} v{self.version}"
logging.debug(f"Add {self._fullname} to registry")
REGISTERED_DECISION_POINTS.registry[self._fullname] = self
global REGISTERED_DECISION_POINTS

def to_table(self):
rows = []
rows.append(f"{self.description}")
rows.append("")
REGISTERED_DECISION_POINTS.append(self)

headings = ["Value", "Key", "Description"]

def make_row(items):
return "| " + " | ".join(items) + " |"
def dp_to_table(dp: SsvcDecisionPoint) -> str:
"""
Convert a decision point to a markdown table.
:param dp: The decision point to convert.
:return: a string containing the markdown table.
"""
rows = []
rows.append(f"{dp.description}")
rows.append("")

headings = ["Value", "Key", "Description"]

rows.append(make_row(headings))
rows.append(make_row(["---" for _ in headings]))
def make_row(items):
return "| " + " | ".join(items) + " |"

for value in self.values:
rows.append(make_row([value.name, value.key, value.description]))
rows.append(make_row(headings))
rows.append(make_row(["---" for _ in headings]))

return "\n".join(rows)
for value in dp.values:
rows.append(make_row([value.name, value.key, value.description]))

return "\n".join(rows)


def main():
opt_none = SsvcDecisionPointValue(
name="None", key="N", description="No exploit available"
)
opt_poc = SsvcDecisionPointValue(
name="PoC", key="P", description="Proof of concept exploit available"
)
opt_active = SsvcDecisionPointValue(
name="Active", key="A", description="Active exploitation observed"
)
opts = [opt_none, opt_poc, opt_active]

dp = SsvcDecisionPoint(
_comment="This is an optional comment that will be included in the object.",
values=opts,
name="Exploitation",
description="Is there an exploit available?",
key="E",
version="1.0.0",
values=(
SsvcDecisionPointValue(
name="None", key="N", description="No exploit available"
),
SsvcDecisionPointValue(
name="PoC",
key="P",
description="Proof of concept exploit available",
),
SsvcDecisionPointValue(
name="Active", key="A", description="Active exploitation observed"
),
),
)

print(dp.to_json(indent=2))


Expand Down
Loading

0 comments on commit 621ce0f

Please sign in to comment.