Skip to content

Commit

Permalink
Add runner benchmark (#4210)
Browse files Browse the repository at this point in the history
* add benchmark dependencies

Signed-off-by: Nok <nok.lam.chan@quantumblack.com>

* add structure

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* add benchmark

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* tmp commit

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* force benchmarks to be a package

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update config

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* rename folder

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* fix asv config

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* rename

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update format

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* typo

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* incorrect config

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* back to kedro_benchmarks

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* rename benchmark file

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* clean up

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update asv config

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update config

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update config

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* fix memory test

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* remove memory tracking since it's not meaningful

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* test

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* commit benchmark module

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* ADD README

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* rename kedro_benchmarks

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* update asv config

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* lint

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

* test matrix of runner

Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>

---------

Signed-off-by: Nok <nok.lam.chan@quantumblack.com>
Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>
  • Loading branch information
noklam authored Oct 28, 2024
1 parent a5d9bb4 commit 1ecb0c8
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 21 deletions.
13 changes: 9 additions & 4 deletions asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
"project": "Kedro",
"project_url": "https://kedro.org/",
"repo": ".",
"install_command": ["pip install -e ."],
"branches": ["main"],
"install_command": [
"pip install -e . kedro-datasets[pandas-csvdataset]"
],
"branches": [
"noklam/stress-testing-runners-4127"
],
"environment_name": "kedro",
"environment_type": "virtualenv",
"show_commit_url": "http://github.com/kedro-org/kedro/commit/",
"results_dir": ".asv/results",
"benchmark_dir": "kedro_benchmarks",
"html_dir": ".asv/html",
"matrix": {
"req": {
"kedro-datasets": [],
"pandas": []
"kedro-datasets[pandas]": []
}
}
}
16 changes: 0 additions & 16 deletions benchmarks/benchmark_dummy.py

This file was deleted.

21 changes: 21 additions & 0 deletions kedro_benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
This is the benchmark repository of Kedro, which is mainly used internally:

# Installation
`pip install asv`


# Run the benchmark
Run this in the terminal:
`asv run`

You can also run the benchmark for specific commits or a range of commits, for details
checkout the [official documentation](https://asv.readthedocs.io/en/stable/using.html#benchmarking)

For example, `asv run main..mybranch` will run benchmark against every single commits since branching off from
`main`.

## Compare benchmark for two commits:
Run this in the terminal:
`asv compare v0.1 v0.2`

This run benchmark against two different commits
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
160 changes: 160 additions & 0 deletions kedro_benchmarks/benchmark_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Write the benchmarking functions here.
# See "Writing benchmarks" in the asv docs for more information.

import importlib
import time
from pathlib import Path

import yaml

from kedro.io.data_catalog import DataCatalog
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline


# Simulate an I/O-bound task
def io_bound_task(input_data):
time.sleep(2) # Simulate an I/O wait (e.g., reading from a file)
output = input_data
return output


# Simulate a compute-bound task (matrix multiplication)
def compute_bound_task(input_data) -> str:
# Simulate heavy compute that are not using multicore (not pandas/numpy etc)
ans = 1
for i in range(1, 50000):
ans = ans * i
return "dummy"


def create_data_catalog():
"""
Use dataset factory pattern to make sure the benchmark cover the slowest path.
"""
catalog_conf = """
'output_{pattern}':
type: pandas.CSVDataset
filepath: benchmarks/data/'{pattern}.csv'
'numpy_{pattern}':
type: pickle.PickleDataset
filepath: benchmarks/data/'{pattern}.pkl'
'{catch_all_dataset_pattern}':
type: pandas.CSVDataset
filepath: benchmarks/data/data.csv
"""
catalog_conf = yaml.safe_load(catalog_conf)
catalog = DataCatalog.from_config(catalog_conf)
return catalog


def create_io_bound_node(inputs=None, outputs=None, name=None):
io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name)
return io_node


def create_io_bound_pipeline():
dummy_pipeline = pipeline(
[
create_io_bound_node("dummy_1", "output_1"),
create_io_bound_node("dummy_2", "output_2"),
create_io_bound_node("dummy_3", "output_3"),
create_io_bound_node("dummy_4", "output_4"),
create_io_bound_node("dummy_5", "output_5"),
create_io_bound_node("dummy_6", "output_6"),
create_io_bound_node("dummy_7", "output_7"),
create_io_bound_node("dummy_1", "output_8"),
create_io_bound_node("dummy_1", "output_9"),
create_io_bound_node("dummy_1", "output_10"),
]
)
return dummy_pipeline


def create_compute_bound_node(inputs=None, outputs=None, name=None):
io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name)
return io_node


def create_compute_bound_pipeline():
dummy_pipeline = pipeline(
[
create_compute_bound_node("dummy_1", "numpy_1"),
create_compute_bound_node("dummy_2", "numpy_2"),
create_compute_bound_node("dummy_3", "numpy_3"),
create_compute_bound_node("dummy_4", "numpy_4"),
create_compute_bound_node("dummy_5", "numpy_5"),
create_compute_bound_node("dummy_6", "numpy_6"),
create_compute_bound_node("dummy_7", "numpy_7"),
create_compute_bound_node("dummy_1", "numpy_8"),
create_compute_bound_node("dummy_1", "numpy_9"),
create_compute_bound_node("dummy_1", "numpy_10"),
]
)
return dummy_pipeline


class RunnerMemorySuite:
params = (
"SequentialRunner",
"ThreadRunner",
"ParallelRunner",
)
param_names = ("runner",)

def setup(self, *args, **kwargs):
data_dir = Path("benchmarks/data")
data_dir.mkdir(exist_ok=True, parents=True)

# Create a dummy csv
with open(data_dir / "data.csv", "w") as f:
f.write("col1,col2\n1,2\n")

def mem_runners(self, runner):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_module = importlib.import_module("kedro.runner")
runner_obj = getattr(runner_module, runner)()
runner_obj.run(test_pipeline, catalog=catalog)

def peakmem_runners(self, runner):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_module = importlib.import_module("kedro.runner")
runner_obj = getattr(runner_module, runner)()
runner_obj.run(test_pipeline, catalog=catalog)


class RunnerTimeSuite:
params = (
"SequentialRunner",
"ThreadRunner",
"ParallelRunner",
)
param_names = ("runner",)

def setup(self, *args, **kwargs):
data_dir = Path("benchmarks/data")
data_dir.mkdir(exist_ok=True, parents=True)

# Create a dummy csv
with open(data_dir / "data.csv", "w") as f:
f.write("col1,col2\n1,2\n")

def time_compute_bound_runner(self, runner):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_module = importlib.import_module("kedro.runner")
runner_obj = getattr(runner_module, runner)()
runner_obj.run(test_pipeline, catalog=catalog)

def time_io_bound_runner(self, runner):
"""IO bound pipeline"""
catalog = create_data_catalog()
test_pipeline = create_io_bound_pipeline()
runner_module = importlib.import_module("kedro.runner")
runner_obj = getattr(runner_module, runner)()
runner_obj.run(test_pipeline, catalog=catalog)
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ jupyter = [
"ipylab>=1.0.0",
"notebook>=7.0.0" # requires the new share backend of notebook and labs"
]
all = [ "kedro[test,docs,jupyter]" ]
benchmark = [
"asv"
]
all = [ "kedro[test,docs,jupyter,benchmark]" ]

[project.urls]
Homepage = "https://kedro.org"
Expand Down

0 comments on commit 1ecb0c8

Please sign in to comment.