Skip to content

Commit 51f949d

Browse files
authored
Allow to profile tasks. (#88)
1 parent 70278b0 commit 51f949d

File tree

11 files changed

+406
-6
lines changed

11 files changed

+406
-6
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ Here are some details:
124124
- Dependencies and products of a task are tracked via markers. For dependencies use
125125
``@pytask.mark.depends_on`` and for products use ``@pytask.mark.produces``. Use
126126
strings and ``pathlib.Path`` to specify the location. Pass multiple dependencies or
127-
products as a list.
127+
products as a list or a dictionary for positional or key-based access.
128128
- With ``produces`` (and ``depends_on``) as function arguments, you get access to the
129129
dependencies and products inside the function via ``pathlib.Path`` objects. Here,
130130
``produces`` holds the path to ``"hello_earth.txt"``.

docs/changes.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ all releases are available on `PyPI <https://pypi.org/project/pytask>`_ and
1818
- :gh:`84` fixes an error in the path normalization introduced by :gh:`81`.
1919
- :gh:`85` sorts collected tasks, dependencies, and products by name.
2020
- :gh:`87` fixes that dirty versions are displayed in the documentation.
21+
- :gh:`88` adds a ``profile`` command to show information on tasks like duration and
22+
file size of products.
2123

2224

2325
0.0.14 - 2021-03-23

docs/tutorials/how_to_profile.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
How to profile
2+
==============
3+
4+
pytask collects information on the runtime of tasks when they finished successfully. To
5+
display the information, enter
6+
7+
.. code-block:: console
8+
9+
$ pytask profile

docs/tutorials/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ project. Start here if you are a new user.
1717
how_to_select_tasks
1818
how_to_clean
1919
how_to_collect
20+
how_to_profile
2021
how_to_skip_tasks
2122
how_to_make_tasks_persist
2223
how_to_capture

src/_pytask/cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def pytask_add_hooks(pm):
5151
from _pytask import parameters
5252
from _pytask import parametrize
5353
from _pytask import persist
54+
from _pytask import profile
5455
from _pytask import resolve_dependencies
5556
from _pytask import skipping
5657

@@ -68,6 +69,7 @@ def pytask_add_hooks(pm):
6869
pm.register(parameters)
6970
pm.register(parametrize)
7071
pm.register(persist)
72+
pm.register(profile)
7173
pm.register(resolve_dependencies)
7274
pm.register(skipping)
7375

src/_pytask/collect_command.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def collect(**config_from_cli):
6666
_print_collected_tasks(dictionary, session.config["nodes"])
6767

6868
console.print()
69-
console.rule(style=None)
69+
console.rule(style=ColorCode.NEUTRAL)
7070

7171
except CollectionError:
7272
session.exit_code = ExitCode.COLLECTION_FAILED

src/_pytask/hookspecs.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,3 +360,22 @@ def pytask_log_session_header(session):
360360
@hookspec
361361
def pytask_log_session_footer(session, infos, duration, color):
362362
"""Log session information at the end of a run."""
363+
364+
365+
# Hooks for profile.
366+
367+
368+
@hookspec
369+
def pytask_profile_add_info_on_task(session, tasks, profile):
370+
"""Add information on task for profile.
371+
372+
Hook implementations can add information to the ``profile`` dictionary. The
373+
dictionary's keys are the task names. The value for each task is a dictionary itself
374+
where keys correspond to columns of the profile table.
375+
376+
"""
377+
378+
379+
@hookspec
380+
def pytask_profile_export_profile(session, profile):
381+
"""Export the profile."""

src/_pytask/nodes.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,12 @@ class PythonFunctionTask(MetaTask):
8787
"""List[MetaNode]: A list of products of task."""
8888
markers = attr.ib(factory=list)
8989
"""Optional[List[Mark]]: A list of markers attached to the task function."""
90-
keep_dict = attr.ib(factory=dict)
90+
keep_dict = attr.ib(default=False)
91+
"""Dict[str, bool]: Should dictionaries for single nodes be preserved?"""
9192
_report_sections = attr.ib(factory=list)
93+
"""List[Tuple[str]]: A list of reports with entries for when, what, and content."""
94+
attributes = attr.ib(factory=dict)
95+
"""Dict[Any, Any]: A dictionary to store additional information of the task."""
9296

9397
@classmethod
9498
def from_path_name_function_session(cls, path, name, function, session):

src/_pytask/parameters.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
@hookimpl(trylast=True)
3232
def pytask_extend_command_line_interface(cli):
3333
"""Register general markers."""
34-
for command in ["build", "clean", "collect", "markers"]:
34+
for command in ["build", "clean", "collect", "markers", "profile"]:
3535
cli.commands[command].params.append(_CONFIG_OPTION)
36-
for command in ["build", "clean", "collect"]:
36+
for command in ["build", "clean", "collect", "profile"]:
3737
cli.commands[command].params.append(_IGNORE_OPTION)
38-
for command in ["build", "clean", "collect"]:
38+
for command in ["build", "clean", "collect", "profile"]:
3939
cli.commands[command].params.append(_PATH_ARGUMENT)

src/_pytask/profile.py

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
"""This module contains the code to profile the execution."""
2+
import csv
3+
import json
4+
import sys
5+
import time
6+
from pathlib import Path
7+
8+
import click
9+
from _pytask.config import hookimpl
10+
from _pytask.console import console
11+
from _pytask.database import db
12+
from _pytask.enums import ColorCode
13+
from _pytask.enums import ExitCode
14+
from _pytask.exceptions import CollectionError
15+
from _pytask.exceptions import ConfigurationError
16+
from _pytask.nodes import FilePathNode
17+
from _pytask.nodes import reduce_node_name
18+
from _pytask.pluginmanager import get_plugin_manager
19+
from _pytask.session import Session
20+
from _pytask.shared import get_first_non_none_value
21+
from pony import orm
22+
from rich.table import Table
23+
from rich.traceback import Traceback
24+
25+
26+
class Runtime(db.Entity):
27+
"""Record of runtimes of tasks."""
28+
29+
task = orm.PrimaryKey(str)
30+
date = orm.Required(float)
31+
duration = orm.Required(float)
32+
33+
34+
@hookimpl(tryfirst=True)
35+
def pytask_extend_command_line_interface(cli: click.Group):
36+
"""Extend the command line interface."""
37+
cli.add_command(profile)
38+
39+
40+
@hookimpl
41+
def pytask_parse_config(config, config_from_cli):
42+
"""Parse the configuration."""
43+
config["export"] = get_first_non_none_value(
44+
config_from_cli, key="export", default=None
45+
)
46+
47+
48+
@hookimpl
49+
def pytask_post_parse(config):
50+
"""Register the export option."""
51+
config["pm"].register(ExportNameSpace)
52+
config["pm"].register(DurationNameSpace)
53+
config["pm"].register(FileSizeNameSpace)
54+
55+
56+
@hookimpl(hookwrapper=True)
57+
def pytask_execute_task(task):
58+
"""Attach the duration of the execution to the task."""
59+
start = time.time()
60+
yield
61+
end = time.time()
62+
task.attributes["duration"] = (start, end)
63+
64+
65+
@hookimpl
66+
def pytask_execute_task_process_report(report):
67+
"""Store runtime of successfully finishing tasks in database."""
68+
task = report.task
69+
duration = task.attributes.get("duration")
70+
if report.success and duration is not None:
71+
_create_or_update_runtime(task.name, *duration)
72+
73+
74+
@orm.db_session
75+
def _create_or_update_runtime(task_name, start, end):
76+
"""Create or update a runtime entry."""
77+
try:
78+
runtime = Runtime[task_name]
79+
except orm.ObjectNotFound:
80+
Runtime(task=task_name, date=start, duration=end - start)
81+
else:
82+
for attr, val in [("date", start), ("duration", end - start)]:
83+
setattr(runtime, attr, val)
84+
85+
86+
@click.command()
87+
@click.option(
88+
"--export",
89+
type=str,
90+
default=None,
91+
help="Export the profile in the specified format.",
92+
)
93+
def profile(**config_from_cli):
94+
"""Show profile information on collected tasks."""
95+
config_from_cli["command"] = "profile"
96+
97+
try:
98+
# Duplication of the same mechanism in :func:`pytask.main.main`.
99+
pm = get_plugin_manager()
100+
from _pytask import cli
101+
102+
pm.register(cli)
103+
pm.hook.pytask_add_hooks(pm=pm)
104+
105+
config = pm.hook.pytask_configure(pm=pm, config_from_cli=config_from_cli)
106+
session = Session.from_config(config)
107+
108+
except (ConfigurationError, Exception):
109+
session = Session({}, None)
110+
session.exit_code = ExitCode.CONFIGURATION_FAILED
111+
console.print(Traceback.from_exception(*sys.exc_info()))
112+
113+
else:
114+
try:
115+
session.hook.pytask_log_session_header(session=session)
116+
session.hook.pytask_collect(session=session)
117+
session.hook.pytask_resolve_dependencies(session=session)
118+
119+
profile = {task.name: {} for task in session.tasks}
120+
session.hook.pytask_profile_add_info_on_task(
121+
session=session, tasks=session.tasks, profile=profile
122+
)
123+
profile = _process_profile(profile)
124+
125+
_print_profile_table(profile, session.tasks, session.config["paths"])
126+
127+
session.hook.pytask_profile_export_profile(session=session, profile=profile)
128+
129+
console.rule(style=ColorCode.NEUTRAL)
130+
131+
except CollectionError:
132+
session.exit_code = ExitCode.COLLECTION_FAILED
133+
134+
except Exception:
135+
session.exit_code = ExitCode.FAILED
136+
console.print_exception()
137+
console.rule(style=ColorCode.FAILED)
138+
139+
sys.exit(session.exit_code)
140+
141+
142+
def _print_profile_table(profile, tasks, paths):
143+
"""Print the profile table."""
144+
name_to_task = {task.name: task for task in tasks}
145+
info_names = _get_info_names(profile)
146+
147+
console.print()
148+
if profile:
149+
table = Table("Task")
150+
for name in info_names:
151+
table.add_column(name, justify="right")
152+
153+
for task_name, info in profile.items():
154+
reduced_name = reduce_node_name(name_to_task[task_name], paths)
155+
infos = [str(i) for i in info.values()]
156+
table.add_row(reduced_name, *infos)
157+
158+
console.print(table)
159+
else:
160+
console.print("No information is stored on the collected tasks.")
161+
162+
163+
class DurationNameSpace:
164+
@staticmethod
165+
@hookimpl
166+
def pytask_profile_add_info_on_task(tasks, profile):
167+
runtimes = _collect_runtimes([task.name for task in tasks])
168+
for name, duration in runtimes.items():
169+
profile[name]["Last Duration (in s)"] = round(duration, 2)
170+
171+
172+
@orm.db_session
173+
def _collect_runtimes(task_names):
174+
"""Collect runtimes."""
175+
runtimes = [Runtime.get(task=task_name) for task_name in task_names]
176+
runtimes = [r for r in runtimes if r is not None]
177+
return {r.task: r.duration for r in runtimes}
178+
179+
180+
class FileSizeNameSpace:
181+
@staticmethod
182+
@hookimpl
183+
def pytask_profile_add_info_on_task(session, tasks, profile):
184+
for task in tasks:
185+
successors = list(session.dag.successors(task.name))
186+
if successors:
187+
sum_bytes = 0
188+
for successor in successors:
189+
node = session.dag.nodes[successor]["node"]
190+
if isinstance(node, FilePathNode):
191+
try:
192+
sum_bytes += node.path.stat().st_size
193+
except FileNotFoundError:
194+
pass
195+
196+
profile[task.name]["Size of Products"] = _to_human_readable_size(
197+
sum_bytes
198+
)
199+
200+
201+
def _to_human_readable_size(bytes_, units=None):
202+
"""Convert bytes to a human readable size."""
203+
units = [" bytes", "KB", "MB", "GB", "TB"] if units is None else units
204+
return (
205+
str(bytes_) + units[0]
206+
if bytes_ < 1024
207+
else _to_human_readable_size(bytes_ >> 10, units[1:])
208+
)
209+
210+
211+
def _process_profile(profile):
212+
"""Process profile to make it ready for printing and storing."""
213+
info_names = _get_info_names(profile)
214+
if info_names:
215+
complete_profiles = {
216+
task_name: {
217+
attr_name: profile[task_name].get(attr_name, "")
218+
for attr_name in info_names
219+
}
220+
for task_name in sorted(profile)
221+
}
222+
else:
223+
complete_profiles = {}
224+
return complete_profiles
225+
226+
227+
class ExportNameSpace:
228+
@staticmethod
229+
@hookimpl(trylast=True)
230+
def pytask_profile_export_profile(session, profile):
231+
extension = session.config["export"]
232+
233+
if extension == "csv":
234+
_export_to_csv(profile)
235+
elif extension == "json":
236+
_export_to_json(profile)
237+
elif extension is None:
238+
pass
239+
else:
240+
raise ValueError(f"The export option '{extension}' cannot be handled.")
241+
242+
243+
def _export_to_csv(profile):
244+
"""Export profile to csv."""
245+
info_names = _get_info_names(profile)
246+
path = Path.cwd().joinpath("profile.csv")
247+
248+
with open(path, "w", newline="") as file:
249+
writer = csv.writer(file)
250+
writer.writerow(("Task", *info_names))
251+
for task_name, info in profile.items():
252+
writer.writerow((task_name, *info.values()))
253+
254+
255+
def _export_to_json(profile):
256+
"""Export profile to json."""
257+
json_ = json.dumps(profile)
258+
path = Path.cwd().joinpath("profile.json")
259+
path.write_text(json_)
260+
261+
262+
def _get_info_names(profile):
263+
"""Get names of infos of tasks."""
264+
info_names = sorted(set().union(*[set(val) for val in profile.values()]))
265+
return info_names

0 commit comments

Comments
 (0)