Skip to content

Commit

Permalink
enable multiprocessing on other platforms
Browse files Browse the repository at this point in the history
  • Loading branch information
asottile committed Oct 27, 2022
1 parent ebbb57d commit 0d667a7
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 317 deletions.
25 changes: 4 additions & 21 deletions src/flake8/api/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
import os.path
from typing import Any

import flake8
from flake8.discover_files import expand_paths
from flake8.formatting import base as formatter
from flake8.main import application as app
from flake8.options import config
from flake8.options.parse_args import parse_args

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -163,7 +162,7 @@ def init_report(
# Stop cringing... I know it's gross.
self._application.make_guide()
self._application.file_checker_manager = None
self._application.make_file_checker_manager()
self._application.make_file_checker_manager([])

def input_file(
self,
Expand Down Expand Up @@ -200,23 +199,7 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
An initialized StyleGuide
"""
application = app.Application()
prelim_opts, remaining_args = application.parse_preliminary_options([])
flake8.configure_logging(prelim_opts.verbose, prelim_opts.output_file)

cfg, cfg_dir = config.load_config(
config=prelim_opts.config,
extra=prelim_opts.append_config,
isolated=prelim_opts.isolated,
)

application.find_plugins(
cfg,
cfg_dir,
enable_extensions=prelim_opts.enable_extensions,
require_plugins=prelim_opts.require_plugins,
)
application.register_plugin_options()
application.parse_configuration_and_cli(cfg, cfg_dir, remaining_args)
application.plugins, application.options = parse_args([])
# We basically want application.initialize to be called but with these
# options set instead before we make our formatter, notifier, internal
# style guide and file checker manager.
Expand All @@ -229,5 +212,5 @@ def get_style_guide(**kwargs: Any) -> StyleGuide:
LOG.error('Could not update option "%s"', key)
application.make_formatter()
application.make_guide()
application.make_file_checker_manager()
application.make_file_checker_manager([])
return StyleGuide(application)
171 changes: 77 additions & 94 deletions src/flake8/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@
from __future__ import annotations

import argparse
import collections
import contextlib
import errno
import logging
import multiprocessing.pool
import signal
import tokenize
from typing import Any
from typing import Generator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple

from flake8 import defaults
from flake8 import exceptions
from flake8 import processor
from flake8 import utils
from flake8.discover_files import expand_paths
from flake8.options.parse_args import parse_args
from flake8.plugins.finder import Checkers
from flake8.plugins.finder import LoadedPlugin
from flake8.style_guide import StyleGuideManager
Expand All @@ -41,6 +44,41 @@
# noise in diffs.
}

_mp_plugins: Checkers
_mp_options: argparse.Namespace


@contextlib.contextmanager
def _mp_prefork(
plugins: Checkers, options: argparse.Namespace
) -> Generator[None, None, None]:
# we can save significant startup work w/ `fork` multiprocessing
global _mp_plugins, _mp_options
_mp_plugins, _mp_options = plugins, options
try:
yield
finally:
del _mp_plugins, _mp_options


def _mp_init(argv: Sequence[str]) -> None:
global _mp_plugins, _mp_options

# Ensure correct signaling of ^C using multiprocessing.Pool.
signal.signal(signal.SIGINT, signal.SIG_IGN)

try:
_mp_plugins, _mp_options # for `fork` this'll already be set
except NameError:
plugins, options = parse_args(argv)
_mp_plugins, _mp_options = plugins.checkers, options


def _mp_run(filename: str) -> tuple[str, Results, dict[str, int]]:
return FileChecker(
filename=filename, plugins=_mp_plugins, options=_mp_options
).run_checks()


class Manager:
"""Manage the parallelism and checker instances for each plugin and file.
Expand All @@ -65,45 +103,36 @@ def __init__(
self,
style_guide: StyleGuideManager,
plugins: Checkers,
argv: Sequence[str],
) -> None:
"""Initialize our Manager instance."""
self.style_guide = style_guide
self.options = style_guide.options
self.plugins = plugins
self.jobs = self._job_count()
self._all_checkers: list[FileChecker] = []
self.checkers: list[FileChecker] = []
self.statistics = {
"files": 0,
"logical lines": 0,
"physical lines": 0,
"tokens": 0,
}
self.exclude = (*self.options.exclude, *self.options.extend_exclude)
self.argv = argv
self.results: list[tuple[str, Results, dict[str, int]]] = []

def _process_statistics(self) -> None:
for checker in self.checkers:
for _, _, statistics in self.results:
for statistic in defaults.STATISTIC_NAMES:
self.statistics[statistic] += checker.statistics[statistic]
self.statistics["files"] += len(self.checkers)
self.statistics[statistic] += statistics[statistic]
self.statistics["files"] += len(self.filenames)

def _job_count(self) -> int:
# First we walk through all of our error cases:
# - multiprocessing library is not present
# - we're running on windows in which case we know we have significant
# implementation issues
# - the user provided stdin and that's not something we can handle
# well
# - the user provided some awful input

# class state is only preserved when using the `fork` strategy.
if multiprocessing.get_start_method() != "fork":
LOG.warning(
"The multiprocessing module is not available. "
"Ignoring --jobs arguments."
)
return 0

if utils.is_using_stdin(self.options.filenames):
LOG.warning(
"The --jobs option is not compatible with supplying "
Expand Down Expand Up @@ -141,27 +170,6 @@ def _handle_results(self, filename: str, results: Results) -> int:
)
return reported_results_count

def make_checkers(self, paths: list[str] | None = None) -> None:
"""Create checkers for each file."""
if paths is None:
paths = self.options.filenames

self._all_checkers = [
FileChecker(
filename=filename,
plugins=self.plugins,
options=self.options,
)
for filename in expand_paths(
paths=paths,
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.exclude,
)
]
self.checkers = [c for c in self._all_checkers if c.should_process]
LOG.info("Checking %d files", len(self.checkers))

def report(self) -> tuple[int, int]:
"""Report all of the errors found in the managed file checkers.
Expand All @@ -172,40 +180,25 @@ def report(self) -> tuple[int, int]:
A tuple of the total results found and the results reported.
"""
results_reported = results_found = 0
for checker in self._all_checkers:
results = sorted(checker.results, key=lambda tup: (tup[1], tup[2]))
filename = checker.display_name
for filename, results, _ in self.results:
results.sort(key=lambda tup: (tup[1], tup[2]))
with self.style_guide.processing_file(filename):
results_reported += self._handle_results(filename, results)
results_found += len(results)
return (results_found, results_reported)

def run_parallel(self) -> None:
"""Run the checkers in parallel."""
# fmt: off
final_results: dict[str, list[tuple[str, int, int, str, str | None]]] = collections.defaultdict(list) # noqa: E501
final_statistics: dict[str, dict[str, int]] = collections.defaultdict(dict) # noqa: E501
# fmt: on

pool = _try_initialize_processpool(self.jobs)
with _mp_prefork(self.plugins, self.options):
pool = _try_initialize_processpool(self.jobs, self.argv)

if pool is None:
self.run_serial()
return

pool_closed = False
try:
pool_map = pool.imap_unordered(
_run_checks,
self.checkers,
chunksize=calculate_pool_chunksize(
len(self.checkers), self.jobs
),
)
for ret in pool_map:
filename, results, statistics = ret
final_results[filename] = results
final_statistics[filename] = statistics
self.results = list(pool.imap_unordered(_mp_run, self.filenames))
pool.close()
pool.join()
pool_closed = True
Expand All @@ -214,15 +207,16 @@ def run_parallel(self) -> None:
pool.terminate()
pool.join()

for checker in self.checkers:
filename = checker.display_name
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]

def run_serial(self) -> None:
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
self.results = [
FileChecker(
filename=filename,
plugins=self.plugins,
options=self.options,
).run_checks()
for filename in self.filenames
]

def run(self) -> None:
"""Run all the checkers.
Expand All @@ -234,23 +228,30 @@ def run(self) -> None:
:issue:`117`) this also implements fallback to serial processing.
"""
try:
if self.jobs > 1 and len(self.checkers) > 1:
if self.jobs > 1 and len(self.filenames) > 1:
self.run_parallel()
else:
self.run_serial()
except KeyboardInterrupt:
LOG.warning("Flake8 was interrupted by the user")
raise exceptions.EarlyQuit("Early quit while running checks")

def start(self, paths: list[str] | None = None) -> None:
def start(self) -> None:
"""Start checking files.
:param paths:
Path names to check. This is passed directly to
:meth:`~Manager.make_checkers`.
"""
LOG.info("Making checkers")
self.make_checkers(paths)
self.filenames = tuple(
expand_paths(
paths=self.options.filenames,
stdin_display_name=self.options.stdin_display_name,
filename_patterns=self.options.filename,
exclude=self.exclude,
)
)

def stop(self) -> None:
"""Stop checking files."""
Expand Down Expand Up @@ -325,7 +326,7 @@ def report(

def run_check(self, plugin: LoadedPlugin, **arguments: Any) -> Any:
"""Run the check in a single plugin."""
assert self.processor is not None
assert self.processor is not None, self.filename
try:
params = self.processor.keyword_arguments_for(
plugin.parameters, arguments
Expand Down Expand Up @@ -409,7 +410,7 @@ def _extract_syntax_information(exception: Exception) -> tuple[int, int]:

def run_ast_checks(self) -> None:
"""Run all checks expecting an abstract syntax tree."""
assert self.processor is not None
assert self.processor is not None, self.filename
ast = self.processor.build_ast()

for plugin in self.plugins.tree:
Expand Down Expand Up @@ -514,19 +515,21 @@ def process_tokens(self) -> None:

def run_checks(self) -> tuple[str, Results, dict[str, int]]:
"""Run checks against the file."""
assert self.processor is not None
if self.processor is None or not self.should_process:
return self.display_name, self.results, self.statistics

try:
self.run_ast_checks()
self.process_tokens()
except (SyntaxError, tokenize.TokenError) as e:
code = "E902" if isinstance(e, tokenize.TokenError) else "E999"
row, column = self._extract_syntax_information(e)
self.report(code, row, column, f"{type(e).__name__}: {e.args[0]}")
return self.filename, self.results, self.statistics
return self.display_name, self.results, self.statistics

logical_lines = self.processor.statistics["logical lines"]
self.statistics["logical lines"] = logical_lines
return self.filename, self.results, self.statistics
return self.display_name, self.results, self.statistics

def handle_newline(self, token_type: int) -> None:
"""Handle the logic when encountering a newline token."""
Expand Down Expand Up @@ -573,17 +576,13 @@ def check_physical_eol(
self.run_physical_checks(line)


def _pool_init() -> None:
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)


def _try_initialize_processpool(
job_count: int,
argv: Sequence[str],
) -> multiprocessing.pool.Pool | None:
"""Return a new process pool instance if we are able to create one."""
try:
return multiprocessing.Pool(job_count, _pool_init)
return multiprocessing.Pool(job_count, _mp_init, initargs=(argv,))
except OSError as err:
if err.errno not in SERIAL_RETRY_ERRNOS:
raise
Expand All @@ -593,22 +592,6 @@ def _try_initialize_processpool(
return None


def calculate_pool_chunksize(num_checkers: int, num_jobs: int) -> int:
"""Determine the chunksize for the multiprocessing Pool.
- For chunksize, see: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap # noqa
- This formula, while not perfect, aims to give each worker two batches of
work.
- See: https://github.com/pycqa/flake8/issues/829#note_18878876
- See: https://github.com/pycqa/flake8/issues/197
"""
return max(num_checkers // (num_jobs * 2), 1)


def _run_checks(checker: FileChecker) -> tuple[str, Results, dict[str, int]]:
return checker.run_checks()


def find_offset(
offset: int, mapping: processor._LogicalMapping
) -> tuple[int, int]:
Expand Down
Loading

0 comments on commit 0d667a7

Please sign in to comment.