Skip to content

GH-121970: Extract audit_events into a new extension #122325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# ---------------------

extensions = [
'audit_events',
'c_annotations',
'glossary_search',
'lexers',
Expand Down
262 changes: 262 additions & 0 deletions Doc/tools/extensions/audit_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
"""Support for documenting audit events."""

from __future__ import annotations

import re
from typing import TYPE_CHECKING

from docutils import nodes
from sphinx.errors import NoUri
from sphinx.locale import _ as sphinx_gettext
from sphinx.transforms.post_transforms import SphinxPostTransform
from sphinx.util import logging
from sphinx.util.docutils import SphinxDirective

if TYPE_CHECKING:
from collections.abc import Iterator

from sphinx.application import Sphinx
from sphinx.builders import Builder
from sphinx.environment import BuildEnvironment

logger = logging.getLogger(__name__)

# This list of sets are allowable synonyms for event argument names.
# If two names are in the same set, they are treated as equal for the
# purposes of warning. This won't help if the number of arguments is
# different!
_SYNONYMS = [
frozenset({"file", "path", "fd"}),
]


class AuditEvents:
def __init__(self) -> None:
self.events: dict[str, list[str]] = {}
self.sources: dict[str, list[tuple[str, str]]] = {}

def __iter__(self) -> Iterator[tuple[str, list[str], tuple[str, str]]]:
for name, args in self.events.items():
for source in self.sources[name]:
yield name, args, source

def add_event(
self, name, args: list[str], source: tuple[str, str]
) -> None:
if name in self.events:
self._check_args_match(name, args)
else:
self.events[name] = args
self.sources.setdefault(name, []).append(source)

def _check_args_match(self, name: str, args: list[str]) -> None:
current_args = self.events[name]
msg = (
f"Mismatched arguments for audit-event {name}: "
f"{current_args!r} != {args!r}"
)
if current_args == args:
return
if len(current_args) != len(args):
logger.warning(msg)
return
for a1, a2 in zip(current_args, args, strict=False):
if a1 == a2:
continue
if any(a1 in s and a2 in s for s in _SYNONYMS):
continue
logger.warning(msg)
return

def id_for(self, name) -> str:
source_count = len(self.sources.get(name, ()))
name_clean = re.sub(r"\W", "_", name)
return f"audit_event_{name_clean}_{source_count}"

def rows(self) -> Iterator[tuple[str, list[str], list[tuple[str, str]]]]:
for name in sorted(self.events.keys()):
yield name, self.events[name], self.sources[name]


def initialise_audit_events(app: Sphinx) -> None:
"""Initialise the audit_events attribute on the environment."""
if not hasattr(app.env, "audit_events"):
app.env.audit_events = AuditEvents()


def audit_events_purge(
app: Sphinx, env: BuildEnvironment, docname: str
) -> None:
"""This is to remove traces of removed documents from env.audit_events."""
fresh_audit_events = AuditEvents()
for name, args, (doc, target) in env.audit_events:
if doc != docname:
fresh_audit_events.add_event(name, args, (doc, target))


def audit_events_merge(
app: Sphinx,
env: BuildEnvironment,
docnames: list[str],
other: BuildEnvironment,
) -> None:
"""In Sphinx parallel builds, this merges audit_events from subprocesses."""
for name, args, source in other.audit_events:
env.audit_events.add_event(name, args, source)


class AuditEvent(SphinxDirective):
has_content = True
required_arguments = 1
optional_arguments = 2
final_argument_whitespace = True

_label = [
sphinx_gettext(
"Raises an :ref:`auditing event <auditing>` "
"{name} with no arguments."
),
sphinx_gettext(
"Raises an :ref:`auditing event <auditing>` "
"{name} with argument {args}."
),
sphinx_gettext(
"Raises an :ref:`auditing event <auditing>` "
"{name} with arguments {args}."
),
]

def run(self) -> list[nodes.paragraph]:
name = self.arguments[0]
if len(self.arguments) >= 2 and self.arguments[1]:
args = [
arg
for argument in self.arguments[1].strip("'\"").split(",")
if (arg := argument.strip())
]
else:
args = []
ids = []
try:
target = self.arguments[2].strip("\"'")
except (IndexError, TypeError):
target = None
if not target:
target = self.env.audit_events.id_for(name)
ids.append(target)
self.env.audit_events.add_event(name, args, (self.env.docname, target))

node = nodes.paragraph("", classes=["audit-hook"], ids=ids)
self.set_source_info(node)
if self.content:
self.state.nested_parse(self.content, self.content_offset, node)
else:
num_args = min(2, len(args))
text = self._label[num_args].format(
name=f"``{name}``",
args=", ".join(f"``{a}``" for a in args),
)
parsed, messages = self.state.inline_text(text, self.lineno)
node += parsed
node += messages
return [node]


class audit_event_list(nodes.General, nodes.Element): # noqa: N801
pass


class AuditEventListDirective(SphinxDirective):
def run(self) -> list[audit_event_list]:
return [audit_event_list()]


class AuditEventListTransform(SphinxPostTransform):
default_priority = 500

def run(self) -> None:
if self.document.next_node(audit_event_list) is None:
return

table = self._make_table(self.app.builder, self.env.docname)
for node in self.document.findall(audit_event_list):
node.replace_self(table)

def _make_table(self, builder: Builder, docname: str) -> nodes.table:
table = nodes.table(cols=3)
group = nodes.tgroup(
"",
nodes.colspec(colwidth=30),
nodes.colspec(colwidth=55),
nodes.colspec(colwidth=15),
cols=3,
)
head = nodes.thead()
body = nodes.tbody()

table += group
group += head
group += body

head += nodes.row(
"",
nodes.entry("", nodes.paragraph("", "Audit event")),
nodes.entry("", nodes.paragraph("", "Arguments")),
nodes.entry("", nodes.paragraph("", "References")),
)

for name, args, sources in builder.env.audit_events.rows():
body += self._make_row(builder, docname, name, args, sources)

return table

@staticmethod
def _make_row(
builder: Builder,
docname: str,
name: str,
args: list[str],
sources: list[tuple[str, str]],
) -> nodes.row:
row = nodes.row()
name_node = nodes.paragraph("", nodes.Text(name))
row += nodes.entry("", name_node)

args_node = nodes.paragraph()
for arg in args:
args_node += nodes.literal(arg, arg)
args_node += nodes.Text(", ")
if len(args_node.children) > 0:
args_node.children.pop() # remove trailing comma
row += nodes.entry("", args_node)

backlinks_node = nodes.paragraph()
backlinks = enumerate(sorted(set(sources)), start=1)
for i, (doc, label) in backlinks:
if isinstance(label, str):
ref = nodes.reference("", f"[{i}]", internal=True)
try:
target = (
f"{builder.get_relative_uri(docname, doc)}#{label}"
)
except NoUri:
continue
else:
ref["refuri"] = target
backlinks_node += ref
row += nodes.entry("", backlinks_node)
return row


def setup(app: Sphinx):
app.add_directive("audit-event", AuditEvent)
app.add_directive("audit-event-table", AuditEventListDirective)
app.add_post_transform(AuditEventListTransform)
app.connect("builder-inited", initialise_audit_events)
app.connect("env-purge-doc", audit_events_purge)
app.connect("env-merge-info", audit_events_merge)
return {
"version": "1.0",
"parallel_read_safe": True,
"parallel_write_safe": True,
}
Loading
Loading