Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/_static/css/custom.css
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
/* Remove execution count for notebook cells. */
div.prompt {
display: none;
}

img.card-img-top {
height: 52px;
}
Expand Down
7 changes: 7 additions & 0 deletions docs/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ chronological order. Releases follow `semantic versioning <https://semver.org/>`
all releases are available on `PyPI <https://pypi.org/project/pytask>`_ and
`Anaconda.org <https://anaconda.org/conda-forge/pytask>`_.


0.0.15 - 2021-xx-xx
-------------------

- :gh:`80` replaces some remaining formatting using ``pprint`` with ``rich``.


0.0.14 - 2021-03-23
-------------------

Expand Down
8 changes: 6 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@
# root, use os.path.abspath to make it absolute, like shown here.
import os
import sys
from datetime import datetime

import pytask
import sphinx


sys.path.insert(0, os.path.abspath("../src"))


import pytask # noqa: E402


# -- Project information ---------------------------------------------------------------

project = "pytask"
copyright = "2020, Tobias Raabe" # noqa: A001
year = datetime.now().year
copyright = f"2020-{year}, Tobias Raabe" # noqa: A001
author = "Tobias Raabe"

# The full version, including alpha/beta/rc tags
Expand Down
3 changes: 0 additions & 3 deletions docs/rtd_environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,3 @@ dependencies:
- pony >=0.7.13
- pexpect
- rich

- pip:
- -e ../
16 changes: 15 additions & 1 deletion src/_pytask/console.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""This module contains the code to format output on the command line."""
import os
import sys
from typing import List

from rich.console import Console

from rich.tree import Tree

_IS_WSL = "IS_WSL" in os.environ or "WSL_DISTRO_NAME" in os.environ
_IS_WINDOWS_TERMINAL = "WT_SESSION" in os.environ
Expand All @@ -26,3 +27,16 @@


console = Console(color_system=_COLOR_SYSTEM)


def format_strings_as_flat_tree(strings: List[str], title: str, icon: str) -> str:
"""Format list of strings as flat tree."""
tree = Tree(title)
for name in strings:
tree.add(icon + name)

text = "".join(
[x.text for x in tree.__rich_console__(console, console.options)][:-1]
)

return text
27 changes: 22 additions & 5 deletions src/_pytask/dag.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Implement some capabilities to deal with the DAG."""
import itertools
import pprint
from pathlib import Path
from typing import Dict
from typing import Generator
from typing import Iterable
from typing import List

import attr
import networkx as nx
from _pytask.console import format_strings_as_flat_tree
from _pytask.console import TASK_ICON
from _pytask.mark import get_specific_markers_from_task
from _pytask.nodes import MetaTask
from _pytask.nodes import reduce_node_name


def descending_tasks(task_name: str, dag: nx.DiGraph) -> Generator[str, None, None]:
Expand Down Expand Up @@ -52,14 +55,17 @@ class TopologicalSorter:
_nodes_out = attr.ib(factory=set)

@classmethod
def from_dag(cls, dag: nx.DiGraph) -> "TopologicalSorter":
def from_dag(cls, dag: nx.DiGraph, paths: List[Path] = None) -> "TopologicalSorter":
if paths is None:
paths = []

if not dag.is_directed():
raise ValueError("Only directed graphs have a topological order.")

tasks = [
dag.nodes[node]["task"] for node in dag.nodes if "task" in dag.nodes[node]
]
priorities = _extract_priorities_from_tasks(tasks)
priorities = _extract_priorities_from_tasks(tasks, paths)

task_names = {task.name for task in tasks}
task_dict = {name: nx.ancestors(dag, name) & task_names for name in task_names}
Expand Down Expand Up @@ -118,7 +124,9 @@ def static_order(self) -> Generator[str, None, None]:
self.done(new_task)


def _extract_priorities_from_tasks(tasks: List[MetaTask]) -> Dict[str, int]:
def _extract_priorities_from_tasks(
tasks: List[MetaTask], paths: List[Path]
) -> Dict[str, int]:
"""Extract priorities from tasks.

Priorities are set via the ``pytask.mark.try_first`` and ``pytask.mark.try_last``
Expand All @@ -138,10 +146,19 @@ def _extract_priorities_from_tasks(tasks: List[MetaTask]) -> Dict[str, int]:
tasks_w_mixed_priorities = [
name for name, p in priorities.items() if p["try_first"] and p["try_last"]
]

if tasks_w_mixed_priorities:
name_to_task = {task.name: task for task in tasks}
reduced_names = [
reduce_node_name(name_to_task[name], paths)
for name in tasks_w_mixed_priorities
]
text = format_strings_as_flat_tree(
reduced_names, "Tasks with mixed priorities", TASK_ICON
)
raise ValueError(
"'try_first' and 'try_last' cannot be applied on the same task. See the "
f"following tasks for errors:\n\n{pprint.pformat(tasks_w_mixed_priorities)}"
f"following tasks for errors:\n\n{text}"
)

# Recode to numeric values for sorting.
Expand Down
8 changes: 8 additions & 0 deletions src/_pytask/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,11 @@ def reduce_node_name(node, paths: List[Path]):
raise ValueError(f"Unknown node {node} with type '{type(node)}'.")

return name


def reduce_names_of_multiple_nodes(names, dag, paths):
"""Reduce the names of multiple nodes in the DAG."""
return [
reduce_node_name(dag.nodes[n].get("node") or dag.nodes[n].get("task"), paths)
for n in names
]
10 changes: 6 additions & 4 deletions src/_pytask/parametrize.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import copy
import functools
import itertools
import pprint
import types
from typing import Any
from typing import Callable
Expand All @@ -12,7 +11,8 @@
from typing import Union

from _pytask.config import hookimpl
from _pytask.console import console
from _pytask.console import format_strings_as_flat_tree
from _pytask.console import TASK_ICON
from _pytask.mark import Mark
from _pytask.mark import MARK_GEN as mark # noqa: N811
from _pytask.shared import find_duplicates
Expand Down Expand Up @@ -124,10 +124,12 @@ def pytask_parametrize_task(session, name, obj):
names = [i[0] for i in names_and_functions]
duplicates = find_duplicates(names)
if duplicates:
formatted = pprint.pformat(duplicates, width=console.width)
text = format_strings_as_flat_tree(
duplicates, "Duplicated task ids", TASK_ICON
)
raise ValueError(
"The following ids are duplicated while parametrizing task "
f"{obj.__name__}.\n\n{formatted}\n\nIt might be caused by "
f"'{obj.__name__}'.\n\n{text}\n\nIt might be caused by "
"parametrizing the task with the same combination of arguments "
"multiple times. Change the arguments or change the ids generated by "
"the parametrization."
Expand Down
21 changes: 4 additions & 17 deletions src/_pytask/path.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""This module contains code to handle paths."""
import os
from pathlib import Path
from pathlib import PurePath
from typing import List
from typing import Union

Expand Down Expand Up @@ -83,19 +83,6 @@ def find_common_ancestor_of_nodes(*names: str) -> Path:

def find_common_ancestor(*paths: Union[str, Path]) -> Path:
"""Find a common ancestor of many paths."""
paths = [path if isinstance(path, PurePath) else Path(path) for path in paths]

for path in paths:
if not path.is_absolute():
raise ValueError(
f"Cannot find common ancestor for relative paths. {path} is relative."
)

common_parents = set.intersection(*[set(path.parents) for path in paths])

if len(common_parents) == 0:
raise ValueError("Paths have no common ancestor.")
else:
longest_parent = sorted(common_parents, key=lambda x: len(x.parts))[-1]

return longest_parent
path = os.path.commonpath(paths)
path = Path(path)
return path
13 changes: 3 additions & 10 deletions src/_pytask/resolve_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from _pytask.exceptions import NodeNotFoundError
from _pytask.exceptions import ResolvingDependenciesError
from _pytask.mark import Mark
from _pytask.nodes import reduce_names_of_multiple_nodes
from _pytask.nodes import reduce_node_name
from _pytask.path import find_common_ancestor_of_nodes
from _pytask.report import ResolvingDependenciesReport
Expand Down Expand Up @@ -180,7 +181,7 @@ def _check_if_root_nodes_are_available(dag):
short_node_name = reduce_node_name(
dag.nodes[node]["node"], [common_ancestor]
)
short_successors = _reduce_names_of_multiple_nodes(
short_successors = reduce_names_of_multiple_nodes(
dag.successors(node), dag, [common_ancestor]
)
dictionary[short_node_name] = short_successors
Expand Down Expand Up @@ -231,7 +232,7 @@ def _check_if_tasks_have_the_same_products(dag):
short_node_name = reduce_node_name(
dag.nodes[node]["node"], [common_ancestor]
)
short_predecessors = _reduce_names_of_multiple_nodes(
short_predecessors = reduce_names_of_multiple_nodes(
dag.predecessors(node), dag, [common_ancestor]
)
dictionary[short_node_name] = short_predecessors
Expand All @@ -257,11 +258,3 @@ def pytask_resolve_dependencies_log(report):

console.print()
console.rule(style=ColorCode.FAILED)


def _reduce_names_of_multiple_nodes(names, dag, paths):
"""Reduce the names of multiple nodes in the DAG."""
return [
reduce_node_name(dag.nodes[n].get("node") or dag.nodes[n].get("task"), paths)
for n in names
]
42 changes: 34 additions & 8 deletions tests/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
from _pytask.dag import task_and_descending_tasks
from _pytask.dag import TopologicalSorter
from _pytask.mark import Mark
from _pytask.nodes import MetaTask


@attr.s
class _DummyTask:
class _DummyTask(MetaTask):
name = attr.ib(type=str, converter=str)
markers = attr.ib(factory=list)
path = attr.ib(default=None)
base_name = ""

def execute(self):
pass

def state(self):
pass


@pytest.fixture()
Expand Down Expand Up @@ -59,15 +68,32 @@ def test_node_and_neighbors(dag):
@pytest.mark.parametrize(
"tasks, expectation, expected",
[
([_DummyTask("1", [Mark("try_last", (), {})])], does_not_raise(), {"1": -1}),
([_DummyTask("1", [Mark("try_first", (), {})])], does_not_raise(), {"1": 1}),
([_DummyTask("1", [])], does_not_raise(), {"1": 0}),
(
[_DummyTask("1", [Mark("try_first", (), {}), Mark("try_last", (), {})])],
pytest.param(
[_DummyTask("1", [Mark("try_last", (), {})])],
does_not_raise(),
{"1": -1},
id="test try_last",
),
pytest.param(
[_DummyTask("1", [Mark("try_first", (), {})])],
does_not_raise(),
{"1": 1},
id="test try_first",
),
pytest.param(
[_DummyTask("1", [])], does_not_raise(), {"1": 0}, id="test no priority"
),
pytest.param(
[
_DummyTask(
"1", [Mark("try_first", (), {}), Mark("try_last", (), {})], ""
)
],
pytest.raises(ValueError, match="'try_first' and 'try_last' cannot be"),
{"1": 1},
id="test mixed priorities",
),
(
pytest.param(
[
_DummyTask("1", [Mark("try_first", (), {})]),
_DummyTask("2", []),
Expand All @@ -80,7 +106,7 @@ def test_node_and_neighbors(dag):
)
def test_extract_priorities_from_tasks(tasks, expectation, expected):
with expectation:
result = _extract_priorities_from_tasks(tasks)
result = _extract_priorities_from_tasks(tasks, [])
assert result == expected


Expand Down
28 changes: 19 additions & 9 deletions tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,13 @@ def test_scheduling_w_priorities(tmp_path):
source = """
import pytask


@pytask.mark.try_first
def task_z():
pass


def task_x():
pass
def task_z(): pass

def task_x(): pass

@pytask.mark.try_last
def task_y():
pass
def task_y(): pass
"""
tmp_path.joinpath("task_dummy.py").write_text(textwrap.dedent(source))

Expand All @@ -268,3 +262,19 @@ def task_y():
assert session.execution_reports[0].task.name.endswith("task_z")
assert session.execution_reports[1].task.name.endswith("task_x")
assert session.execution_reports[2].task.name.endswith("task_y")


@pytest.mark.end_to_end
def test_scheduling_w_mixed_priorities(tmp_path):
source = """
import pytask

@pytask.mark.try_last
@pytask.mark.try_first
def task_mixed(): pass
"""
tmp_path.joinpath("task_dummy.py").write_text(textwrap.dedent(source))

session = main({"paths": tmp_path})

assert session.exit_code == 4
Loading