Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ all releases are available on `Anaconda.org <https://anaconda.org/pytask/pytask>
- :gh:`34` skips ``pytask_collect_task_teardown`` if task is None.
- :gh:`35` adds the ability to capture stdout and stderr with the CaptureManager.
- :gh:`36` reworks the debugger to make it work with the CaptureManager.
- :gh:`37` removes reports argument from hooks related to task collection.
- :gh:`37` removes ``reports`` argument from hooks related to task collection.
- :gh:`38` allows to pass dictionaries as dependencies and products and inside the
function ``depends_on`` and ``produces`` become dictionaries.


0.0.8 - 2020-10-04
Expand Down
84 changes: 63 additions & 21 deletions docs/tutorials/how_to_define_dependencies_products.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ The ``@pytask.mark.produces`` decorator attaches a product to a task. The string
task is defined.


Optional usage in signature
---------------------------

As seen before, if you have a task with products (or dependencies), you can use
``produces`` (``depends_on``) as a function argument and receive the path or list of
paths inside the functions. It helps to avoid repetition.


Dependencies
------------

Expand All @@ -64,27 +56,77 @@ Most tasks have dependencies. Similar to products, you can use the
produces.write_text(bold_text)


Optional usage in signature
---------------------------

As seen before, if you have a task with products (or dependencies), you can use
``produces`` (``depends_on``) as a function argument and receive the path or a
dictionary of paths inside the functions. It helps to avoid repetition.


Multiple dependencies and products
----------------------------------

If you have multiple dependencies or products, pass a list to the decorator. Inside the
function you receive a list of :class:`pathlib.Path` as well.
Most tasks have multiple dependencies or products. The easiest way to attach multiple
dependencies or products to a task is to pass a :class:`list`, :class:`tuple` or other
iterator to the decorator which contains :class:`str` or :class:`pathlib.Path`.

.. code-block:: python

@pytask.mark.depends_on(["text_a.txt", "text_b.txt"])
@pytask.mark.produces(["bold_text_a.txt", "bold_text_b.txt"])
def task_make_text_bold(depends_on, produces):
for dependency, product in zip(depends_on, produces):
text = dependency.read_text()
bold_text = f"**{text}**"
product.write_text(bold_text)
@pytask.mark.depends_on(["text_1.txt", "text_2.txt"])
def task_example(depends_on):
pass

The function argument ``depends_on`` or ``produces`` becomes a dictionary where keys are
the positions in the list and values are :class:`pathlib.Path`.

.. code-block:: python

depends_on = {0: Path("text_1.txt"), 1: Path("text_2.txt")}

Why dictionaries and not lists? First, dictionaries with positions as keys behave very
similar to lists and conversion between both is easy.

Secondly, dictionaries allow to access paths to dependencies and products via labels
which is preferred over positional access when tasks become more complex and the order
changes.

To assign labels to dependencies or products, pass a dictionary or a list of tuples with
the name in the first and the path in the second position to the decorator. For example,

.. code-block:: python

@pytask.mark.depends_on({"first": "text_1.txt", "second": "text_2.txt"})
@pytask.mark.produces("out.txt")
def task_example(depends_on, produces):
text = depends_on["first"].read_text() + " " + depends_on["second"].read_text()
produces.write_text(text)

or with tuples

.. code-block:: python

@pytask.mark.depends_on([("first", "text_1.txt"), ("second", "text_2.txt")])
def task_example():
...


Multiple decorators
-------------------

You can also attach multiple decorators to a function which will be merged into a single
dictionary. This might help you to group certain dependencies and apply them to multiple
tasks.

.. code-block:: python

common_dependencies = ["text_1.txt", "text_2.txt"]

The last task is overly complex since it is the same operation performed for two
independent dependencies and products. There must be a better way |tm|, right? Check out
the :doc:`tutorial on parametrization <how_to_parametrize_a_task>`.

.. |tm| unicode:: U+2122
@pytask.mark.depends_on(common_dependencies)
@pytask.mark.depends_on("text_3.txt")
def task_example():
...


.. rubric:: References
Expand Down
3 changes: 1 addition & 2 deletions src/_pytask/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def pytask_post_parse(config):

@click.command()
@click.option(
"-m",
"--mode",
type=click.Choice(["dry-run", "interactive", "force"]),
help=_HELP_TEXT_MODE,
Expand Down Expand Up @@ -166,7 +165,7 @@ def _yield_paths_from_task(task):
"""Yield all paths attached to a task."""
yield task.path
for attribute in ["depends_on", "produces"]:
for node in getattr(task, attribute):
for node in getattr(task, attribute).values():
if isinstance(node.value, Path):
yield node.value

Expand Down
36 changes: 28 additions & 8 deletions src/_pytask/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import importlib
import inspect
import sys
import time
import traceback
from pathlib import Path

Expand All @@ -15,11 +16,14 @@
from _pytask.report import CollectionReport
from _pytask.report import CollectionReportFile
from _pytask.report import CollectionReportTask
from _pytask.report import format_collect_footer


@hookimpl
def pytask_collect(session):
"""Collect tasks."""
session.collection_start = time.time()

reports = _collect_from_paths(session)
tasks = _extract_successful_tasks_from_reports(reports)

Expand All @@ -31,13 +35,12 @@ def pytask_collect(session):
)
reports.append(report)

session.hook.pytask_collect_log(session=session, reports=reports, tasks=tasks)

session.collection_reports = reports
session.tasks = tasks

if any(i for i in reports if not i.successful):
raise CollectionError
session.hook.pytask_collect_log(
session=session, reports=session.collection_reports, tasks=session.tasks
)

return True

Expand Down Expand Up @@ -214,19 +217,36 @@ def _extract_successful_tasks_from_reports(reports):
@hookimpl
def pytask_collect_log(session, reports, tasks):
"""Log collection."""
session.collection_end = time.time()
tm_width = session.config["terminal_width"]

message = f"Collected {len(tasks)} task(s)."
if session.deselected:
message += f" Deselected {len(session.deselected)} task(s)."

n_deselected = len(session.deselected)
if n_deselected:
message += f" Deselected {n_deselected} task(s)."
click.echo(message)

failed_reports = [i for i in reports if not i.successful]
if failed_reports:
click.echo(f"{{:=^{tm_width}}}".format(" Errors during collection "))
click.echo("")
click.echo(f"{{:=^{tm_width}}}".format(" Failures during collection "))

for report in failed_reports:
click.echo(f"{{:_^{tm_width}}}".format(report.format_title()))

click.echo("")

traceback.print_exception(*report.exc_info)

click.echo("")
click.echo("=" * tm_width)

duration = round(session.collection_end - session.collection_start, 2)
click.echo(
format_collect_footer(
len(tasks), len(failed_reports), n_deselected, duration, tm_width
),
nl=True,
)

raise CollectionError
4 changes: 2 additions & 2 deletions src/_pytask/collect_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def _organize_tasks(tasks):

task_dict = {
task_name: {
"depends_on": [node.name for node in task.depends_on],
"produces": [node.name for node in task.produces],
"depends_on": [node.name for node in task.depends_on.values()],
"produces": [node.name for node in task.produces.values()],
}
}

Expand Down
110 changes: 97 additions & 13 deletions src/_pytask/nodes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Deals with nodes which are dependencies or products of a task."""
import functools
import inspect
import itertools
import pathlib
from abc import ABCMeta
from abc import abstractmethod
Expand All @@ -13,7 +14,7 @@
from _pytask.exceptions import NodeNotCollectedError
from _pytask.exceptions import NodeNotFoundError
from _pytask.mark import get_marks_from_obj
from _pytask.shared import to_list
from _pytask.shared import find_duplicates


def depends_on(objects: Union[Any, Iterable[Any]]) -> Union[Any, Iterable[Any]]:
Expand Down Expand Up @@ -68,22 +69,24 @@ class PythonFunctionTask(MetaTask):
"""pathlib.Path: Path to the file where the task was defined."""
function = attr.ib(type=callable)
"""callable: The task function."""
depends_on = attr.ib(converter=to_list)
depends_on = attr.ib(factory=dict)
"""Optional[List[MetaNode]]: A list of dependencies of task."""
produces = attr.ib(converter=to_list)
produces = attr.ib(factory=dict)
"""List[MetaNode]: A list of products of task."""
markers = attr.ib()
markers = attr.ib(factory=list)
"""Optional[List[Mark]]: A list of markers attached to the task function."""
_report_sections = attr.ib(factory=list)

@classmethod
def from_path_name_function_session(cls, path, name, function, session):
"""Create a task from a path, name, function, and session."""
objects = _extract_nodes_from_function_markers(function, depends_on)
dependencies = _collect_nodes(session, path, name, objects)
nodes = _convert_objects_to_node_dictionary(objects, "depends_on")
dependencies = _collect_nodes(session, path, name, nodes)

objects = _extract_nodes_from_function_markers(function, produces)
products = _collect_nodes(session, path, name, objects)
nodes = _convert_objects_to_node_dictionary(objects, "produces")
products = _collect_nodes(session, path, name, nodes)

markers = [
marker
Expand Down Expand Up @@ -118,8 +121,10 @@ def _get_kwargs_from_task_for_function(self):
attribute = getattr(self, name)
kwargs[name] = (
attribute[0].value
if len(attribute) == 1
else [node.value for node in attribute]
if len(attribute) == 1 and 0 in attribute
else {
node_name: node.value for node_name, node in attribute.items()
}
)

return kwargs
Expand Down Expand Up @@ -169,8 +174,9 @@ def state(self):

def _collect_nodes(session, path, name, nodes):
"""Collect nodes for a task."""
collect_nodes = []
for node in nodes:
collected_nodes = {}

for node_name, node in nodes.items():
collected_node = session.hook.pytask_collect_node(
session=session, path=path, node=node
)
Expand All @@ -180,9 +186,9 @@ def _collect_nodes(session, path, name, nodes):
f"'{name}' in '{path}'."
)
else:
collect_nodes.append(collected_node)
collected_nodes[node_name] = collected_node

return collect_nodes
return collected_nodes


def _extract_nodes_from_function_markers(function, parser):
Expand All @@ -195,4 +201,82 @@ def _extract_nodes_from_function_markers(function, parser):
"""
marker_name = parser.__name__
for marker in get_marks_from_obj(function, marker_name):
yield from to_list(parser(*marker.args, **marker.kwargs))
parsed = parser(*marker.args, **marker.kwargs)
yield parsed


def _convert_objects_to_node_dictionary(objects, when):
list_of_tuples = _convert_objects_to_list_of_tuples(objects)
_check_that_names_are_not_used_multiple_times(list_of_tuples, when)
nodes = _convert_nodes_to_dictionary(list_of_tuples)
return nodes


def _convert_objects_to_list_of_tuples(objects):
out = []
for obj in objects:
if isinstance(obj, dict):
obj = obj.items()

if isinstance(obj, Iterable) and not isinstance(obj, str):
for x in obj:
if isinstance(x, Iterable) and not isinstance(x, str):
tuple_x = tuple(x)
if len(tuple_x) in [1, 2]:
out.append(tuple_x)
else:
raise ValueError("ERROR")
else:
out.append((x,))
else:
out.append((obj,))

return out


def _check_that_names_are_not_used_multiple_times(list_of_tuples, when):
"""Check that names of nodes are not assigned multiple times.

Tuples in the list have either one or two elements. The first element in the two
element tuples is the name and cannot occur twice.

Examples
--------
>>> _check_that_names_are_not_used_multiple_times(
... [("a",), ("a", 1)], "depends_on"
... )
>>> _check_that_names_are_not_used_multiple_times(
... [("a", 0), ("a", 1)], "produces"
... )
Traceback (most recent call last):
ValueError: '@pytask.mark.produces' has nodes with the same name: {'a'}

"""
names = [x[0] for x in list_of_tuples if len(x) == 2]
duplicated = find_duplicates(names)

if duplicated:
raise ValueError(
f"'@pytask.mark.{when}' has nodes with the same name: {duplicated}"
)


def _convert_nodes_to_dictionary(list_of_tuples):
nodes = {}
counter = itertools.count()
names = [x[0] for x in list_of_tuples if len(x) == 2]

for tuple_ in list_of_tuples:
if len(tuple_) == 2:
node_name, node = tuple_
nodes[node_name] = node

else:
while True:
node_name = next(counter)
if node_name not in names:
break

nodes[node_name] = tuple_[0]

return nodes
Loading