-
Notifications
You must be signed in to change notification settings - Fork 903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add runner benchmark #4210
Add runner benchmark #4210
Changes from all commits
98744d5
216cc7a
a4ab4c4
92c7556
e1c4156
06d178d
783b2e8
033237b
0cb6e4f
1b8a7ab
4341284
bc1ec5c
58a70ff
145af85
7e00882
66ad6c5
985a051
0f5a4f0
ddd6a77
4f905b5
515d91c
fdf0fe2
74d24bf
aaa9a3f
b733534
4c9098b
752440b
8ee7dac
05bcd5e
f1a1235
e33e74e
2349b9f
1f7f053
0ba5c44
713562b
982a5b0
a219069
3e64745
4907858
7b00c0d
f818826
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
This is the benchmark repository of Kedro, which is mainly used internally: | ||
|
||
# Installation | ||
`pip install asv` | ||
|
||
|
||
# Run the benchmark | ||
Run this in the terminal: | ||
`asv run` | ||
|
||
You can also run the benchmark for specific commits or a range of commits, for details | ||
checkout the [official documentation](https://asv.readthedocs.io/en/stable/using.html#benchmarking) | ||
|
||
For example, `asv run main..mybranch` will run benchmark against every single commits since branching off from | ||
`main`. | ||
|
||
## Compare benchmark for two commits: | ||
Run this in the terminal: | ||
`asv compare v0.1 v0.2` | ||
|
||
This run benchmark against two different commits |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
# Write the benchmarking functions here. | ||
# See "Writing benchmarks" in the asv docs for more information. | ||
|
||
import importlib | ||
import time | ||
from pathlib import Path | ||
|
||
import yaml | ||
|
||
from kedro.io.data_catalog import DataCatalog | ||
from kedro.pipeline import node | ||
from kedro.pipeline.modular_pipeline import pipeline | ||
|
||
|
||
# Simulate an I/O-bound task | ||
def io_bound_task(input_data): | ||
time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) | ||
output = input_data | ||
return output | ||
|
||
|
||
# Simulate a compute-bound task (matrix multiplication) | ||
def compute_bound_task(input_data) -> str: | ||
# Simulate heavy compute that are not using multicore (not pandas/numpy etc) | ||
ans = 1 | ||
for i in range(1, 50000): | ||
ans = ans * i | ||
return "dummy" | ||
|
||
|
||
def create_data_catalog(): | ||
""" | ||
Use dataset factory pattern to make sure the benchmark cover the slowest path. | ||
""" | ||
catalog_conf = """ | ||
|
||
'output_{pattern}': | ||
type: pandas.CSVDataset | ||
filepath: benchmarks/data/'{pattern}.csv' | ||
|
||
'numpy_{pattern}': | ||
type: pickle.PickleDataset | ||
filepath: benchmarks/data/'{pattern}.pkl' | ||
|
||
'{catch_all_dataset_pattern}': | ||
type: pandas.CSVDataset | ||
filepath: benchmarks/data/data.csv | ||
""" | ||
catalog_conf = yaml.safe_load(catalog_conf) | ||
catalog = DataCatalog.from_config(catalog_conf) | ||
return catalog | ||
|
||
|
||
def create_io_bound_node(inputs=None, outputs=None, name=None): | ||
io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name) | ||
return io_node | ||
|
||
|
||
def create_io_bound_pipeline(): | ||
dummy_pipeline = pipeline( | ||
[ | ||
create_io_bound_node("dummy_1", "output_1"), | ||
create_io_bound_node("dummy_2", "output_2"), | ||
create_io_bound_node("dummy_3", "output_3"), | ||
create_io_bound_node("dummy_4", "output_4"), | ||
create_io_bound_node("dummy_5", "output_5"), | ||
create_io_bound_node("dummy_6", "output_6"), | ||
create_io_bound_node("dummy_7", "output_7"), | ||
create_io_bound_node("dummy_1", "output_8"), | ||
create_io_bound_node("dummy_1", "output_9"), | ||
create_io_bound_node("dummy_1", "output_10"), | ||
] | ||
) | ||
return dummy_pipeline | ||
|
||
|
||
def create_compute_bound_node(inputs=None, outputs=None, name=None): | ||
io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name) | ||
return io_node | ||
|
||
|
||
def create_compute_bound_pipeline(): | ||
dummy_pipeline = pipeline( | ||
[ | ||
create_compute_bound_node("dummy_1", "numpy_1"), | ||
create_compute_bound_node("dummy_2", "numpy_2"), | ||
create_compute_bound_node("dummy_3", "numpy_3"), | ||
create_compute_bound_node("dummy_4", "numpy_4"), | ||
create_compute_bound_node("dummy_5", "numpy_5"), | ||
create_compute_bound_node("dummy_6", "numpy_6"), | ||
create_compute_bound_node("dummy_7", "numpy_7"), | ||
create_compute_bound_node("dummy_1", "numpy_8"), | ||
create_compute_bound_node("dummy_1", "numpy_9"), | ||
create_compute_bound_node("dummy_1", "numpy_10"), | ||
] | ||
) | ||
return dummy_pipeline | ||
|
||
|
||
class RunnerMemorySuite: | ||
params = ( | ||
"SequentialRunner", | ||
"ThreadRunner", | ||
"ParallelRunner", | ||
) | ||
param_names = ("runner",) | ||
|
||
def setup(self, *args, **kwargs): | ||
data_dir = Path("benchmarks/data") | ||
data_dir.mkdir(exist_ok=True, parents=True) | ||
|
||
# Create a dummy csv | ||
with open(data_dir / "data.csv", "w") as f: | ||
f.write("col1,col2\n1,2\n") | ||
|
||
def mem_runners(self, runner): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we also test memory usage on various catalogs and actually return some objects that need sufficient memory rather than a " There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean just changing what it returns? We can do that but I don't know if it changes anything. Do I also need to change the structure of the pipeline? Most data are not stored in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean varying the catalog and number of entries there. Makes sense about returns, thank you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean adding also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, I mean running tests for different catalog configurations—the number of datasets in the catalog, with/without patterns—so we vary something that actually affects memory usage. We can also vary the pipeline size and the number of input arguments for nodes. In the current setup, we use the same pipeline and catalog configuration and we try to test memory usage by running nodes with heavy compute load that should not grow the memory. So it feels like we might add other scenarios that grow it and we're able to compare (memory grows as expected) runs on them. Adding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ElenaKhaustova I see, https://github.com/kedro-org/kedro/blob/main/benchmarks/benchmark_kedrodatacatalog.py, I think this is covered in the catalog test already? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but we didn't do memory profiling. Maybe worth adding it on the catalog side. Anyway, that's just an idea, so I don't want to block the PR with it. |
||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_module = importlib.import_module("kedro.runner") | ||
runner_obj = getattr(runner_module, runner)() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def peakmem_runners(self, runner): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_module = importlib.import_module("kedro.runner") | ||
runner_obj = getattr(runner_module, runner)() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
|
||
class RunnerTimeSuite: | ||
params = ( | ||
"SequentialRunner", | ||
"ThreadRunner", | ||
"ParallelRunner", | ||
) | ||
param_names = ("runner",) | ||
|
||
def setup(self, *args, **kwargs): | ||
data_dir = Path("benchmarks/data") | ||
data_dir.mkdir(exist_ok=True, parents=True) | ||
|
||
# Create a dummy csv | ||
with open(data_dir / "data.csv", "w") as f: | ||
f.write("col1,col2\n1,2\n") | ||
|
||
def time_compute_bound_runner(self, runner): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_module = importlib.import_module("kedro.runner") | ||
runner_obj = getattr(runner_module, runner)() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def time_io_bound_runner(self, runner): | ||
"""IO bound pipeline""" | ||
catalog = create_data_catalog() | ||
test_pipeline = create_io_bound_pipeline() | ||
runner_module = importlib.import_module("kedro.runner") | ||
runner_obj = getattr(runner_module, runner)() | ||
runner_obj.run(test_pipeline, catalog=catalog) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to revert before merge.