From 98744d56e7e02b5be51b87b7ce5e502456fde9a1 Mon Sep 17 00:00:00 2001 From: Nok Date: Mon, 7 Oct 2024 09:58:45 +0000 Subject: [PATCH 01/30] add benchmark dependencies Signed-off-by: Nok --- pyproject.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d9ebbfd70b..54dff1f226 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,7 +93,10 @@ jupyter = [ "ipylab>=1.0.0", "notebook>=7.0.0" # requires the new share backend of notebook and labs" ] -all = [ "kedro[test,docs,jupyter]" ] +benchmark = [ + "asv" +] +all = [ "kedro[test,docs,jupyter,benchmark]" ] [project.urls] Homepage = "https://kedro.org" From 216cc7a25efb95b783863b4aa8032125940c86bd Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 9 Oct 2024 12:05:29 +0100 Subject: [PATCH 02/30] add structure Signed-off-by: Nok Lam Chan --- benchmarks/benchmark_dummy.py | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/benchmarks/benchmark_dummy.py b/benchmarks/benchmark_dummy.py index fc047eb712..b88184bfae 100644 --- a/benchmarks/benchmark_dummy.py +++ b/benchmarks/benchmark_dummy.py @@ -1,6 +1,58 @@ # Write the benchmarking functions here. # See "Writing benchmarks" in the asv docs for more information. +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline +from kedro.io.data_catalog import DataCatalog +from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner +import time +import numpy as np +import yaml +import pandas as pd +from kedro.io.memory_dataset import MemoryDataset + +# Simulate an I/O-bound task +def io_bound_task(input_data): + time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) + output = input_data + return output + +# Simulate a compute-bound task (matrix multiplication) +def compute_bound_task(input_data: str) -> str: + # Simulate a heavy computation (e.g., large matrix multiplication) + matrix_size = 1000 + matrix_a = np.random.rand(matrix_size, matrix_size) + matrix_b = np.random.rand(matrix_size, matrix_size) + _ = np.dot(matrix_a, matrix_b) # Matrix multiplication + return f"Computed {input_data}" + +def create_data_catalog(): + """ + Use dataset factory pattern to make sure the benchmark cover the slowest path. + """ + catalog_conf = """ + +'output_{pattern}': + type: pandas.CSVDataset + filepath: '{pattern}.csv' + +'numpy_{pattern}': + type: pickle.PickletDataset + filepath: '{pattern}.pkl' + +'{catch_all_dataset_pattern}': + type: pandas.CSVDataset + filepath: data.csv +""" + catalog_conf = yaml.safe_load(catalog_conf) + catalog = DataCatalog.from_config(catalog_conf) + return catalog + + +def create_io_bound_node(inputs=None, outputs=None,name=None): + io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name) + return io_node + class TimeSuite: """ @@ -14,3 +66,18 @@ def setup(self): def time_keys(self): for key in self.d.keys(): pass + + +if __name__ == "__main__": + print("==="*20) + + dummy_pipeline = pipeline([ + create_io_bound_node("dummy_1","output_1",name="dummy_1"), + create_io_bound_node("dummy_2","output_2",name="dummy_2"), + create_io_bound_node("dummy_3","output_3",name="dummy_3"), + ]) + runner = SequentialRunner() + catalog = create_data_catalog() + runner.run(dummy_pipeline, catalog) + + print("===" * 20 + "Done" + "===" * 20) \ No newline at end of file From a4ab4c4809d1bb2085ce04ee799520877cbd3cf1 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 9 Oct 2024 15:31:21 +0100 Subject: [PATCH 03/30] add benchmark Signed-off-by: Nok Lam Chan --- benchmarks/benchmark_dummy.py | 96 +++++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 21 deletions(-) diff --git a/benchmarks/benchmark_dummy.py b/benchmarks/benchmark_dummy.py index b88184bfae..9c28584c78 100644 --- a/benchmarks/benchmark_dummy.py +++ b/benchmarks/benchmark_dummy.py @@ -11,20 +11,22 @@ import pandas as pd from kedro.io.memory_dataset import MemoryDataset + # Simulate an I/O-bound task def io_bound_task(input_data): time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) output = input_data return output + # Simulate a compute-bound task (matrix multiplication) -def compute_bound_task(input_data: str) -> str: - # Simulate a heavy computation (e.g., large matrix multiplication) - matrix_size = 1000 - matrix_a = np.random.rand(matrix_size, matrix_size) - matrix_b = np.random.rand(matrix_size, matrix_size) - _ = np.dot(matrix_a, matrix_b) # Matrix multiplication - return f"Computed {input_data}" +def compute_bound_task(input_data) -> str: + # Simulate heavy compute that are not using multicore (not pandas/numpy etc) + ans = 1 + for i in range(1, 50000): + ans = ans * i + return "dummy" + def create_data_catalog(): """ @@ -37,7 +39,7 @@ def create_data_catalog(): filepath: '{pattern}.csv' 'numpy_{pattern}': - type: pickle.PickletDataset + type: pickle.PickleDataset filepath: '{pattern}.pkl' '{catch_all_dataset_pattern}': @@ -49,15 +51,57 @@ def create_data_catalog(): return catalog -def create_io_bound_node(inputs=None, outputs=None,name=None): +def create_io_bound_node(inputs=None, outputs=None, name=None): io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name) return io_node +def create_io_bound_pipeline(): + dummy_pipeline = pipeline( + [ + create_io_bound_node("dummy_1", "output_1"), + create_io_bound_node("dummy_2", "output_2"), + create_io_bound_node("dummy_3", "output_3"), + create_io_bound_node("dummy_4", "output_4"), + create_io_bound_node("dummy_5", "output_5"), + create_io_bound_node("dummy_6", "output_6"), + create_io_bound_node("dummy_7", "output_7"), + create_io_bound_node("dummy_1", "output_8"), + create_io_bound_node("dummy_1", "output_9"), + create_io_bound_node("dummy_1", "output_10"), + ] + ) + return dummy_pipeline + + +def create_compute_bound_node(inputs=None, outputs=None, name=None): + io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name) + return io_node + + +def create_compute_bound_pipeline(): + dummy_pipeline = pipeline( + [ + create_compute_bound_node("dummy_1", "numpy_1"), + create_compute_bound_node("dummy_2", "numpy_2"), + create_compute_bound_node("dummy_3", "numpy_3"), + create_compute_bound_node("dummy_4", "numpy_4"), + create_compute_bound_node("dummy_5", "numpy_5"), + create_compute_bound_node("dummy_6", "numpy_6"), + create_compute_bound_node("dummy_7", "numpy_7"), + create_compute_bound_node("dummy_1", "numpy_8"), + create_compute_bound_node("dummy_1", "numpy_9"), + create_compute_bound_node("dummy_1", "numpy_10"), + ] + ) + return dummy_pipeline + + class TimeSuite: """ A dummy benchmark suite to test with asv framework. """ + def setup(self): self.d = {} for x in range(500): @@ -69,15 +113,25 @@ def time_keys(self): if __name__ == "__main__": - print("==="*20) - - dummy_pipeline = pipeline([ - create_io_bound_node("dummy_1","output_1",name="dummy_1"), - create_io_bound_node("dummy_2","output_2",name="dummy_2"), - create_io_bound_node("dummy_3","output_3",name="dummy_3"), - ]) - runner = SequentialRunner() - catalog = create_data_catalog() - runner.run(dummy_pipeline, catalog) - - print("===" * 20 + "Done" + "===" * 20) \ No newline at end of file + print("===" * 20) + import time + + result = {} + runner_class = { + "SequentialRunner": SequentialRunner, + "ThreadRunner": ThreadRunner, + "ParallelRunner": ParallelRunner, + } + for name, runner in runner_class.items(): + start = time.time() + print("==" * 20 + name + "==" * 20) + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner = runner() + runner.run(test_pipeline, catalog=catalog) + end = time.time() + result[name] = end - start + print(f"Time spent: {end - start}") + + print("===" * 20 + "Done" + "===" * 20) + print(result) From 92c7556610f341034ffe1b102286bd8615db49d0 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 13:34:31 +0100 Subject: [PATCH 04/30] tmp commit Signed-off-by: Nok Lam Chan --- asv.conf.json | 3 +- benchmarks/benchmark_dummy.py | 65 +++++++++++++++++++++-------------- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/asv.conf.json b/asv.conf.json index 2cfcd3a057..bfafa946bf 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,8 +3,9 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": ["pip install -e ."], + "install_command": ["pip install -e . kedro-datasets numpy pandas"], "branches": ["main"], + "environment_name": "kedro", "environment_type": "virtualenv", "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", diff --git a/benchmarks/benchmark_dummy.py b/benchmarks/benchmark_dummy.py index 9c28584c78..8a48920642 100644 --- a/benchmarks/benchmark_dummy.py +++ b/benchmarks/benchmark_dummy.py @@ -10,7 +10,7 @@ import yaml import pandas as pd from kedro.io.memory_dataset import MemoryDataset - +from pathlib import Path # Simulate an I/O-bound task def io_bound_task(input_data): @@ -36,15 +36,15 @@ def create_data_catalog(): 'output_{pattern}': type: pandas.CSVDataset - filepath: '{pattern}.csv' + filepath: benchmarks/data/'{pattern}.csv' 'numpy_{pattern}': type: pickle.PickleDataset - filepath: '{pattern}.pkl' + filepath: benchmarks/data/'{pattern}.pkl' '{catch_all_dataset_pattern}': type: pandas.CSVDataset - filepath: data.csv + filepath: benchmarks/data/data.csv """ catalog_conf = yaml.safe_load(catalog_conf) catalog = DataCatalog.from_config(catalog_conf) @@ -111,27 +111,40 @@ def time_keys(self): for key in self.d.keys(): pass - -if __name__ == "__main__": - print("===" * 20) - import time - - result = {} - runner_class = { - "SequentialRunner": SequentialRunner, - "ThreadRunner": ThreadRunner, - "ParallelRunner": ParallelRunner, - } - for name, runner in runner_class.items(): - start = time.time() - print("==" * 20 + name + "==" * 20) +class RunnerSuite: + """ + A class to collect all runners performance test + """ + params = [SequentialRunner, ThreadRunner, ParallelRunner] + param_names = ["runner"] + timeout= 3600 + + def setup(self, *args, **kwargs): + print("AAAAA", args) + data_dir = Path("benchmarks/data") + data_dir.mkdir(exist_ok=True, parents=True) + # Create a dummy csv + with open(data_dir/"data.csv", "w") as f: + f.write("col1,col2\n1,2\n") + print(f"all files: {list(Path().glob('**/*'))}") + + def time_runners(self,runner): + import os + print(f"Current Directory {os.getcwd()}") catalog = create_data_catalog() + print("**"*20) + print(catalog) test_pipeline = create_compute_bound_pipeline() - runner = runner() - runner.run(test_pipeline, catalog=catalog) - end = time.time() - result[name] = end - start - print(f"Time spent: {end - start}") - - print("===" * 20 + "Done" + "===" * 20) - print(result) + runner_obj = runner() + runner_obj.run(test_pipeline, catalog=catalog) + + def mem_runners(self, runner): + ... + + def peakmem_runners(self, runner): + ... + +if __name__ == "__main__": + suite = RunnerSuite() + for param in suite.params: + suite.time_runners(param) From e1c4156912d390b9d6496d78e47a2a8cbd0f8e1e Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 14:05:03 +0100 Subject: [PATCH 05/30] force benchmarks to be a package Signed-off-by: Nok Lam Chan --- asv.conf.json | 4 ++-- benchmarks/pyproject.toml | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 benchmarks/pyproject.toml diff --git a/asv.conf.json b/asv.conf.json index bfafa946bf..07cbec98a5 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,8 +3,8 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": ["pip install -e . kedro-datasets numpy pandas"], - "branches": ["main"], + "install_command": ["pip install -e . kedro-datasets numpy pandas benchmarks"], + "branches": ["noklam/stress-testing-runners-4127"], "environment_name": "kedro", "environment_type": "virtualenv", "show_commit_url": "http://github.com/kedro-org/kedro/commit/", diff --git a/benchmarks/pyproject.toml b/benchmarks/pyproject.toml new file mode 100644 index 0000000000..35b221a3b5 --- /dev/null +++ b/benchmarks/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "benchmarks" +version = "0.1.0" +description = "Default template for PDM package" +authors = [ + {name = "Nok Lam Chan", email = "nok.lam.chan@quantumblack.com"}, +] +dependencies = [] +requires-python = "==3.10.*" +readme = "README.md" +license = {text = "MIT"} + + +[tool.pdm] +distribution = false From 06d178db5bd31798f40eac255719e13917b9a2c2 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 14:55:35 +0100 Subject: [PATCH 06/30] update config Signed-off-by: Nok Lam Chan --- asv.conf.json | 2 +- benchmarks/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/asv.conf.json b/asv.conf.json index 07cbec98a5..a7a2fc3d9b 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,7 +3,7 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": ["pip install -e . kedro-datasets numpy pandas benchmarks"], + "install_command": ["pip install -e . ./benchmarks/ kedro-datasets numpy pandas "], "branches": ["noklam/stress-testing-runners-4127"], "environment_name": "kedro", "environment_type": "virtualenv", diff --git a/benchmarks/pyproject.toml b/benchmarks/pyproject.toml index 35b221a3b5..8bde8e6544 100644 --- a/benchmarks/pyproject.toml +++ b/benchmarks/pyproject.toml @@ -6,7 +6,7 @@ authors = [ {name = "Nok Lam Chan", email = "nok.lam.chan@quantumblack.com"}, ] dependencies = [] -requires-python = "==3.10.*" +requires-python = ">=3.9.0" readme = "README.md" license = {text = "MIT"} From 783b2e8d533c0d6f82dcaee609c2c4db60144b90 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 14:59:15 +0100 Subject: [PATCH 07/30] rename folder Signed-off-by: Nok Lam Chan --- asv.conf.json | 2 +- benchmarks/pyproject.toml | 15 --------------- .../benchmark_dummy.py | 0 .../src/benchmarks}/__init__.py | 0 kedro_benchmarks/tests/__init__.py | 0 5 files changed, 1 insertion(+), 16 deletions(-) delete mode 100644 benchmarks/pyproject.toml rename {benchmarks => kedro_benchmarks}/benchmark_dummy.py (100%) rename {benchmarks => kedro_benchmarks/src/benchmarks}/__init__.py (100%) create mode 100644 kedro_benchmarks/tests/__init__.py diff --git a/asv.conf.json b/asv.conf.json index a7a2fc3d9b..26a48d5f14 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,7 +3,7 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": ["pip install -e . ./benchmarks/ kedro-datasets numpy pandas "], + "install_command": ["pip install -e . ./kedro_benchmarks/ kedro-datasets numpy pandas "], "branches": ["noklam/stress-testing-runners-4127"], "environment_name": "kedro", "environment_type": "virtualenv", diff --git a/benchmarks/pyproject.toml b/benchmarks/pyproject.toml deleted file mode 100644 index 8bde8e6544..0000000000 --- a/benchmarks/pyproject.toml +++ /dev/null @@ -1,15 +0,0 @@ -[project] -name = "benchmarks" -version = "0.1.0" -description = "Default template for PDM package" -authors = [ - {name = "Nok Lam Chan", email = "nok.lam.chan@quantumblack.com"}, -] -dependencies = [] -requires-python = ">=3.9.0" -readme = "README.md" -license = {text = "MIT"} - - -[tool.pdm] -distribution = false diff --git a/benchmarks/benchmark_dummy.py b/kedro_benchmarks/benchmark_dummy.py similarity index 100% rename from benchmarks/benchmark_dummy.py rename to kedro_benchmarks/benchmark_dummy.py diff --git a/benchmarks/__init__.py b/kedro_benchmarks/src/benchmarks/__init__.py similarity index 100% rename from benchmarks/__init__.py rename to kedro_benchmarks/src/benchmarks/__init__.py diff --git a/kedro_benchmarks/tests/__init__.py b/kedro_benchmarks/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 From 033237b26d87f0ac8864507f4edc6cddeeaad272 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 15:04:46 +0100 Subject: [PATCH 08/30] fix asv config Signed-off-by: Nok Lam Chan --- asv.conf.json | 3 ++- kedro_benchmarks/__init__.py | 0 kedro_benchmarks/pyproject.toml | 15 +++++++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 kedro_benchmarks/__init__.py create mode 100644 kedro_benchmarks/pyproject.toml diff --git a/asv.conf.json b/asv.conf.json index 26a48d5f14..2fbe9128d9 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -9,5 +9,6 @@ "environment_type": "virtualenv", "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", - "html_dir": ".asv/html" + "html_dir": ".asv/html", + "benchmark_dir": "kedro_benchmarks" } diff --git a/kedro_benchmarks/__init__.py b/kedro_benchmarks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/kedro_benchmarks/pyproject.toml b/kedro_benchmarks/pyproject.toml new file mode 100644 index 0000000000..2d4dd5a68b --- /dev/null +++ b/kedro_benchmarks/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "kedro_benchmarks" +version = "0.1.0" +description = "Default template for PDM package" +authors = [ + {name = "Nok Lam Chan", email = "nok.lam.chan@quantumblack.com"}, +] +dependencies = [] +requires-python = ">=3.9.0" +readme = "README.md" +license = {text = "MIT"} + + +[tool.pdm] +distribution = false From 0cb6e4fda7fbd4516647140df86c0c5ec9535979 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 16:34:59 +0100 Subject: [PATCH 09/30] rename Signed-off-by: Nok Lam Chan --- kedro_benchmarks/{benchmark_dummy.py => benchmark_runner.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename kedro_benchmarks/{benchmark_dummy.py => benchmark_runner.py} (100%) diff --git a/kedro_benchmarks/benchmark_dummy.py b/kedro_benchmarks/benchmark_runner.py similarity index 100% rename from kedro_benchmarks/benchmark_dummy.py rename to kedro_benchmarks/benchmark_runner.py From 1b8a7abda2d3514853e0d5db3c3e009f3d21c87c Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 16:39:53 +0100 Subject: [PATCH 10/30] update format Signed-off-by: Nok Lam Chan --- kedro_benchmarks/benchmark_runner.py | 71 ++++++++++++++++------------ 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/kedro_benchmarks/benchmark_runner.py b/kedro_benchmarks/benchmark_runner.py index 8a48920642..a3ed7ac812 100644 --- a/kedro_benchmarks/benchmark_runner.py +++ b/kedro_benchmarks/benchmark_runner.py @@ -12,6 +12,7 @@ from kedro.io.memory_dataset import MemoryDataset from pathlib import Path + # Simulate an I/O-bound task def io_bound_task(input_data): time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) @@ -97,52 +98,60 @@ def create_compute_bound_pipeline(): return dummy_pipeline -class TimeSuite: - """ - A dummy benchmark suite to test with asv framework. - """ - - def setup(self): - self.d = {} - for x in range(500): - self.d[x] = None - - def time_keys(self): - for key in self.d.keys(): - pass - -class RunnerSuite: - """ - A class to collect all runners performance test - """ +class RunnerMemorySuite: params = [SequentialRunner, ThreadRunner, ParallelRunner] param_names = ["runner"] - timeout= 3600 def setup(self, *args, **kwargs): - print("AAAAA", args) data_dir = Path("benchmarks/data") data_dir.mkdir(exist_ok=True, parents=True) + # Create a dummy csv - with open(data_dir/"data.csv", "w") as f: + with open(data_dir / "data.csv", "w") as f: f.write("col1,col2\n1,2\n") - print(f"all files: {list(Path().glob('**/*'))}") - def time_runners(self,runner): - import os - print(f"Current Directory {os.getcwd()}") + def mem_runners(self, runner): catalog = create_data_catalog() - print("**"*20) - print(catalog) test_pipeline = create_compute_bound_pipeline() runner_obj = runner() runner_obj.run(test_pipeline, catalog=catalog) - def mem_runners(self, runner): - ... - def peakmem_runners(self, runner): - ... + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = runner() + runner_obj.run(test_pipeline, catalog=catalog) + + +class RunnerTimeSuite: + def setup(self, *args, **kwargs): + data_dir = Path("benchmarks/data") + data_dir.mkdir(exist_ok=True, parents=True) + + # Create a dummy csv + with open(data_dir / "data.csv", "w") as f: + f.write("col1,col2\n1,2\n") + + def time_sequential_runner(self): + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = SequentialRunner() + runner_obj.run(test_pipeline, catalog=catalog) + + def time_sequential_runner(self): + """compute bound pipeline""" + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = ParallelRunner() + runner_obj.run(test_pipeline, catalog=catalog) + + def time_thread_runner(self): + """IO bound pipeline""" + catalog = create_data_catalog() + test_pipeline = create_io_bound_pipeline() + runner_obj = ThreadRunner() + runner_obj.run(test_pipeline, catalog=catalog) + if __name__ == "__main__": suite = RunnerSuite() From 4341284513a5120944d265ad0910cf85158de65e Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 16:40:12 +0100 Subject: [PATCH 11/30] typo Signed-off-by: Nok Lam Chan --- kedro_benchmarks/benchmark_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro_benchmarks/benchmark_runner.py b/kedro_benchmarks/benchmark_runner.py index a3ed7ac812..2593e04fb7 100644 --- a/kedro_benchmarks/benchmark_runner.py +++ b/kedro_benchmarks/benchmark_runner.py @@ -138,7 +138,7 @@ def time_sequential_runner(self): runner_obj = SequentialRunner() runner_obj.run(test_pipeline, catalog=catalog) - def time_sequential_runner(self): + def time_parallel_runner(self): """compute bound pipeline""" catalog = create_data_catalog() test_pipeline = create_compute_bound_pipeline() From bc1ec5cfa409d0491769936d5a6571290fdd2648 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 16:42:29 +0100 Subject: [PATCH 12/30] update Signed-off-by: Nok Lam Chan --- asv.conf.json | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/asv.conf.json b/asv.conf.json index 2fbe9128d9..75b8b3227b 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,12 +3,21 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": ["pip install -e . ./kedro_benchmarks/ kedro-datasets numpy pandas "], - "branches": ["noklam/stress-testing-runners-4127"], + "install_command": [ + "pip install -e . ./kedro_benchmarks/ " + ], + "matrix": { + "req": { + "kedro-datasets[pandas]": [] + } + }, + "branches": [ + "noklam/stress-testing-runners-4127" + ], "environment_name": "kedro", "environment_type": "virtualenv", "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", "html_dir": ".asv/html", "benchmark_dir": "kedro_benchmarks" -} +} \ No newline at end of file From 58a70ff6a1164ba8a5754bd9e0346d332b9ce768 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 16:46:07 +0100 Subject: [PATCH 13/30] incorrect config Signed-off-by: Nok Lam Chan --- asv.conf.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/asv.conf.json b/asv.conf.json index 75b8b3227b..486ffbd4dc 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -8,7 +8,8 @@ ], "matrix": { "req": { - "kedro-datasets[pandas]": [] + "kedro-datasets[pandas]": [], + "numpy": [] } }, "branches": [ From 145af853c74b2130b3898f00574186e07abcb6f1 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 16:48:37 +0100 Subject: [PATCH 14/30] update Signed-off-by: Nok Lam Chan --- .asv/results/benchmarks.json | 82 ++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 .asv/results/benchmarks.json diff --git a/.asv/results/benchmarks.json b/.asv/results/benchmarks.json new file mode 100644 index 0000000000..7ebc573527 --- /dev/null +++ b/.asv/results/benchmarks.json @@ -0,0 +1,82 @@ +{ + "benchmark_runner.RunnerMemorySuite.mem_runners": { + "code": "class RunnerMemorySuite:\n def mem_runners(self, runner):\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = runner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", + "name": "benchmark_runner.RunnerMemorySuite.mem_runners", + "param_names": [ + "runner" + ], + "params": [ + [ + "", + "", + "" + ] + ], + "type": "memory", + "unit": "bytes", + "version": "13acda1c4dae582477fa994a0910a548a2189f858f5a89ace163cad942928337" + }, + "benchmark_runner.RunnerMemorySuite.peakmem_runners": { + "code": "class RunnerMemorySuite:\n def peakmem_runners(self, runner):\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = runner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", + "name": "benchmark_runner.RunnerMemorySuite.peakmem_runners", + "param_names": [ + "runner" + ], + "params": [ + [ + "", + "", + "" + ] + ], + "type": "peakmemory", + "unit": "bytes", + "version": "c906244a890c199414b4d50adc3d6de53c0756386563b573c66627fb492f74ad" + }, + "benchmark_runner.RunnerTimeSuite.time_parallel_runner": { + "code": "class RunnerTimeSuite:\n def time_parallel_runner(self):\n \"\"\"compute bound pipeline\"\"\"\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = ParallelRunner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", + "min_run_count": 2, + "name": "benchmark_runner.RunnerTimeSuite.time_parallel_runner", + "number": 0, + "param_names": [], + "params": [], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "type": "time", + "unit": "seconds", + "version": "bc652bf3bbaddc5fe5838632fb007a7a4e6e99592a452ea38987b498e989331c", + "warmup_time": -1 + }, + "benchmark_runner.RunnerTimeSuite.time_sequential_runner": { + "code": "class RunnerTimeSuite:\n def time_sequential_runner(self):\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = SequentialRunner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", + "min_run_count": 2, + "name": "benchmark_runner.RunnerTimeSuite.time_sequential_runner", + "number": 0, + "param_names": [], + "params": [], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "type": "time", + "unit": "seconds", + "version": "16b755b06d88eedad82d93a3b24dca6efd7cb45dc252319feb4f3a3a1ffe5326", + "warmup_time": -1 + }, + "benchmark_runner.RunnerTimeSuite.time_thread_runner": { + "code": "class RunnerTimeSuite:\n def time_thread_runner(self):\n \"\"\"IO bound pipeline\"\"\"\n catalog = create_data_catalog()\n test_pipeline = create_io_bound_pipeline()\n runner_obj = ThreadRunner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", + "min_run_count": 2, + "name": "benchmark_runner.RunnerTimeSuite.time_thread_runner", + "number": 0, + "param_names": [], + "params": [], + "repeat": 0, + "rounds": 2, + "sample_time": 0.01, + "type": "time", + "unit": "seconds", + "version": "f11dce24b5e61ce532f0c810d23916a0d0a0dd2e3f9150cc9aa1f6b20ebe4d4e", + "warmup_time": -1 + }, + "version": 2 +} \ No newline at end of file From 7e0088213e1d23daeb21269ccfbe0daea9489aa5 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 14 Oct 2024 16:48:42 +0100 Subject: [PATCH 15/30] update Signed-off-by: Nok Lam Chan --- asv.conf.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asv.conf.json b/asv.conf.json index 486ffbd4dc..3133339fc0 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -8,8 +8,8 @@ ], "matrix": { "req": { - "kedro-datasets[pandas]": [], - "numpy": [] + "kedro-datasets[pandas]": [""], + "numpy": [""] } }, "branches": [ From 66ad6c50480d384ffc0427df2d9df35c366f7284 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 16 Oct 2024 12:22:17 +0100 Subject: [PATCH 16/30] back to kedro_benchmarks Signed-off-by: Nok Lam Chan --- .asv/results/benchmarks.json | 82 ------------------- asv.conf.json | 19 +---- kedro_benchmarks/__init__.py | 0 ...benchmark_runner.py => benchmark_dummy.py} | 71 +++++++--------- kedro_benchmarks/pyproject.toml | 15 ---- 5 files changed, 35 insertions(+), 152 deletions(-) delete mode 100644 .asv/results/benchmarks.json delete mode 100644 kedro_benchmarks/__init__.py rename kedro_benchmarks/{benchmark_runner.py => benchmark_dummy.py} (75%) delete mode 100644 kedro_benchmarks/pyproject.toml diff --git a/.asv/results/benchmarks.json b/.asv/results/benchmarks.json deleted file mode 100644 index 7ebc573527..0000000000 --- a/.asv/results/benchmarks.json +++ /dev/null @@ -1,82 +0,0 @@ -{ - "benchmark_runner.RunnerMemorySuite.mem_runners": { - "code": "class RunnerMemorySuite:\n def mem_runners(self, runner):\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = runner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", - "name": "benchmark_runner.RunnerMemorySuite.mem_runners", - "param_names": [ - "runner" - ], - "params": [ - [ - "", - "", - "" - ] - ], - "type": "memory", - "unit": "bytes", - "version": "13acda1c4dae582477fa994a0910a548a2189f858f5a89ace163cad942928337" - }, - "benchmark_runner.RunnerMemorySuite.peakmem_runners": { - "code": "class RunnerMemorySuite:\n def peakmem_runners(self, runner):\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = runner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", - "name": "benchmark_runner.RunnerMemorySuite.peakmem_runners", - "param_names": [ - "runner" - ], - "params": [ - [ - "", - "", - "" - ] - ], - "type": "peakmemory", - "unit": "bytes", - "version": "c906244a890c199414b4d50adc3d6de53c0756386563b573c66627fb492f74ad" - }, - "benchmark_runner.RunnerTimeSuite.time_parallel_runner": { - "code": "class RunnerTimeSuite:\n def time_parallel_runner(self):\n \"\"\"compute bound pipeline\"\"\"\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = ParallelRunner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", - "min_run_count": 2, - "name": "benchmark_runner.RunnerTimeSuite.time_parallel_runner", - "number": 0, - "param_names": [], - "params": [], - "repeat": 0, - "rounds": 2, - "sample_time": 0.01, - "type": "time", - "unit": "seconds", - "version": "bc652bf3bbaddc5fe5838632fb007a7a4e6e99592a452ea38987b498e989331c", - "warmup_time": -1 - }, - "benchmark_runner.RunnerTimeSuite.time_sequential_runner": { - "code": "class RunnerTimeSuite:\n def time_sequential_runner(self):\n catalog = create_data_catalog()\n test_pipeline = create_compute_bound_pipeline()\n runner_obj = SequentialRunner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", - "min_run_count": 2, - "name": "benchmark_runner.RunnerTimeSuite.time_sequential_runner", - "number": 0, - "param_names": [], - "params": [], - "repeat": 0, - "rounds": 2, - "sample_time": 0.01, - "type": "time", - "unit": "seconds", - "version": "16b755b06d88eedad82d93a3b24dca6efd7cb45dc252319feb4f3a3a1ffe5326", - "warmup_time": -1 - }, - "benchmark_runner.RunnerTimeSuite.time_thread_runner": { - "code": "class RunnerTimeSuite:\n def time_thread_runner(self):\n \"\"\"IO bound pipeline\"\"\"\n catalog = create_data_catalog()\n test_pipeline = create_io_bound_pipeline()\n runner_obj = ThreadRunner()\n runner_obj.run(test_pipeline, catalog=catalog)\n\n def setup(self, *args, **kwargs):\n data_dir = Path(\"benchmarks/data\")\n data_dir.mkdir(exist_ok=True, parents=True)\n \n # Create a dummy csv\n with open(data_dir / \"data.csv\", \"w\") as f:\n f.write(\"col1,col2\\n1,2\\n\")", - "min_run_count": 2, - "name": "benchmark_runner.RunnerTimeSuite.time_thread_runner", - "number": 0, - "param_names": [], - "params": [], - "repeat": 0, - "rounds": 2, - "sample_time": 0.01, - "type": "time", - "unit": "seconds", - "version": "f11dce24b5e61ce532f0c810d23916a0d0a0dd2e3f9150cc9aa1f6b20ebe4d4e", - "warmup_time": -1 - }, - "version": 2 -} \ No newline at end of file diff --git a/asv.conf.json b/asv.conf.json index 3133339fc0..26a48d5f14 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,22 +3,11 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": [ - "pip install -e . ./kedro_benchmarks/ " - ], - "matrix": { - "req": { - "kedro-datasets[pandas]": [""], - "numpy": [""] - } - }, - "branches": [ - "noklam/stress-testing-runners-4127" - ], + "install_command": ["pip install -e . ./kedro_benchmarks/ kedro-datasets numpy pandas "], + "branches": ["noklam/stress-testing-runners-4127"], "environment_name": "kedro", "environment_type": "virtualenv", "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", - "html_dir": ".asv/html", - "benchmark_dir": "kedro_benchmarks" -} \ No newline at end of file + "html_dir": ".asv/html" +} diff --git a/kedro_benchmarks/__init__.py b/kedro_benchmarks/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/kedro_benchmarks/benchmark_runner.py b/kedro_benchmarks/benchmark_dummy.py similarity index 75% rename from kedro_benchmarks/benchmark_runner.py rename to kedro_benchmarks/benchmark_dummy.py index 2593e04fb7..8a48920642 100644 --- a/kedro_benchmarks/benchmark_runner.py +++ b/kedro_benchmarks/benchmark_dummy.py @@ -12,7 +12,6 @@ from kedro.io.memory_dataset import MemoryDataset from pathlib import Path - # Simulate an I/O-bound task def io_bound_task(input_data): time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) @@ -98,60 +97,52 @@ def create_compute_bound_pipeline(): return dummy_pipeline -class RunnerMemorySuite: - params = [SequentialRunner, ThreadRunner, ParallelRunner] - param_names = ["runner"] - - def setup(self, *args, **kwargs): - data_dir = Path("benchmarks/data") - data_dir.mkdir(exist_ok=True, parents=True) - - # Create a dummy csv - with open(data_dir / "data.csv", "w") as f: - f.write("col1,col2\n1,2\n") +class TimeSuite: + """ + A dummy benchmark suite to test with asv framework. + """ - def mem_runners(self, runner): - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = runner() - runner_obj.run(test_pipeline, catalog=catalog) + def setup(self): + self.d = {} + for x in range(500): + self.d[x] = None - def peakmem_runners(self, runner): - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = runner() - runner_obj.run(test_pipeline, catalog=catalog) + def time_keys(self): + for key in self.d.keys(): + pass +class RunnerSuite: + """ + A class to collect all runners performance test + """ + params = [SequentialRunner, ThreadRunner, ParallelRunner] + param_names = ["runner"] + timeout= 3600 -class RunnerTimeSuite: def setup(self, *args, **kwargs): + print("AAAAA", args) data_dir = Path("benchmarks/data") data_dir.mkdir(exist_ok=True, parents=True) - # Create a dummy csv - with open(data_dir / "data.csv", "w") as f: + with open(data_dir/"data.csv", "w") as f: f.write("col1,col2\n1,2\n") + print(f"all files: {list(Path().glob('**/*'))}") - def time_sequential_runner(self): + def time_runners(self,runner): + import os + print(f"Current Directory {os.getcwd()}") catalog = create_data_catalog() + print("**"*20) + print(catalog) test_pipeline = create_compute_bound_pipeline() - runner_obj = SequentialRunner() - runner_obj.run(test_pipeline, catalog=catalog) - - def time_parallel_runner(self): - """compute bound pipeline""" - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = ParallelRunner() + runner_obj = runner() runner_obj.run(test_pipeline, catalog=catalog) - def time_thread_runner(self): - """IO bound pipeline""" - catalog = create_data_catalog() - test_pipeline = create_io_bound_pipeline() - runner_obj = ThreadRunner() - runner_obj.run(test_pipeline, catalog=catalog) + def mem_runners(self, runner): + ... + def peakmem_runners(self, runner): + ... if __name__ == "__main__": suite = RunnerSuite() diff --git a/kedro_benchmarks/pyproject.toml b/kedro_benchmarks/pyproject.toml deleted file mode 100644 index 2d4dd5a68b..0000000000 --- a/kedro_benchmarks/pyproject.toml +++ /dev/null @@ -1,15 +0,0 @@ -[project] -name = "kedro_benchmarks" -version = "0.1.0" -description = "Default template for PDM package" -authors = [ - {name = "Nok Lam Chan", email = "nok.lam.chan@quantumblack.com"}, -] -dependencies = [] -requires-python = ">=3.9.0" -readme = "README.md" -license = {text = "MIT"} - - -[tool.pdm] -distribution = false From 985a05179afae96d089f86d220d93e8220d654b8 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 16 Oct 2024 12:23:31 +0100 Subject: [PATCH 17/30] rename benchmark file Signed-off-by: Nok Lam Chan --- kedro_benchmarks/{benchmark_dummy.py => benchmark_runner.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename kedro_benchmarks/{benchmark_dummy.py => benchmark_runner.py} (100%) diff --git a/kedro_benchmarks/benchmark_dummy.py b/kedro_benchmarks/benchmark_runner.py similarity index 100% rename from kedro_benchmarks/benchmark_dummy.py rename to kedro_benchmarks/benchmark_runner.py From 0f5a4f06f5c0f8bf6fad4c91d59ab947af59873b Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 16 Oct 2024 12:23:47 +0100 Subject: [PATCH 18/30] clean up Signed-off-by: Nok Lam Chan --- kedro_benchmarks/benchmark_runner.py | 73 +++++++++++++++------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/kedro_benchmarks/benchmark_runner.py b/kedro_benchmarks/benchmark_runner.py index 8a48920642..588e1ac923 100644 --- a/kedro_benchmarks/benchmark_runner.py +++ b/kedro_benchmarks/benchmark_runner.py @@ -12,6 +12,7 @@ from kedro.io.memory_dataset import MemoryDataset from pathlib import Path + # Simulate an I/O-bound task def io_bound_task(input_data): time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) @@ -97,54 +98,56 @@ def create_compute_bound_pipeline(): return dummy_pipeline -class TimeSuite: - """ - A dummy benchmark suite to test with asv framework. - """ - - def setup(self): - self.d = {} - for x in range(500): - self.d[x] = None - - def time_keys(self): - for key in self.d.keys(): - pass - -class RunnerSuite: - """ - A class to collect all runners performance test - """ +class RunnerMemorySuite: params = [SequentialRunner, ThreadRunner, ParallelRunner] param_names = ["runner"] - timeout= 3600 def setup(self, *args, **kwargs): - print("AAAAA", args) data_dir = Path("benchmarks/data") data_dir.mkdir(exist_ok=True, parents=True) + # Create a dummy csv - with open(data_dir/"data.csv", "w") as f: + with open(data_dir / "data.csv", "w") as f: f.write("col1,col2\n1,2\n") - print(f"all files: {list(Path().glob('**/*'))}") - def time_runners(self,runner): - import os - print(f"Current Directory {os.getcwd()}") + def mem_runners(self, runner): catalog = create_data_catalog() - print("**"*20) - print(catalog) test_pipeline = create_compute_bound_pipeline() runner_obj = runner() runner_obj.run(test_pipeline, catalog=catalog) - def mem_runners(self, runner): - ... - def peakmem_runners(self, runner): - ... + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = runner() + runner_obj.run(test_pipeline, catalog=catalog) + -if __name__ == "__main__": - suite = RunnerSuite() - for param in suite.params: - suite.time_runners(param) +class RunnerTimeSuite: + def setup(self, *args, **kwargs): + data_dir = Path("benchmarks/data") + data_dir.mkdir(exist_ok=True, parents=True) + + # Create a dummy csv + with open(data_dir / "data.csv", "w") as f: + f.write("col1,col2\n1,2\n") + + def time_sequential_runner(self): + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = SequentialRunner() + runner_obj.run(test_pipeline, catalog=catalog) + + def time_parallel_runner(self): + """compute bound pipeline""" + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = ParallelRunner() + runner_obj.run(test_pipeline, catalog=catalog) + + def time_thread_runner(self): + """IO bound pipeline""" + catalog = create_data_catalog() + test_pipeline = create_io_bound_pipeline() + runner_obj = ThreadRunner() + runner_obj.run(test_pipeline, catalog=catalog) From ddd6a774fd41b11312f59b4a975a8fc1459c4643 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 16 Oct 2024 12:25:12 +0100 Subject: [PATCH 19/30] update asv config Signed-off-by: Nok Lam Chan --- asv.conf.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asv.conf.json b/asv.conf.json index 26a48d5f14..26f5a3d159 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,7 +3,7 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": ["pip install -e . ./kedro_benchmarks/ kedro-datasets numpy pandas "], + "install_command": ["pip install -e . ./kedro_benchmarks/ kedro-datasets[pandas-csvdataset]"], "branches": ["noklam/stress-testing-runners-4127"], "environment_name": "kedro", "environment_type": "virtualenv", From 4f905b52b057ae842645a51fc23f8960666f946e Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 16 Oct 2024 12:25:49 +0100 Subject: [PATCH 20/30] update config Signed-off-by: Nok Lam Chan --- asv.conf.json | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/asv.conf.json b/asv.conf.json index 26f5a3d159..f48e2fcfb8 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -3,11 +3,16 @@ "project": "Kedro", "project_url": "https://kedro.org/", "repo": ".", - "install_command": ["pip install -e . ./kedro_benchmarks/ kedro-datasets[pandas-csvdataset]"], - "branches": ["noklam/stress-testing-runners-4127"], + "install_command": [ + "pip install -e . ./kedro_benchmarks/ kedro-datasets[pandas-csvdataset]" + ], + "branches": [ + "noklam/stress-testing-runners-4127" + ], "environment_name": "kedro", "environment_type": "virtualenv", "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", - "html_dir": ".asv/html" -} + "html_dir": ".asv/html", + "benchmark_directory": "kedro_benchmarks" +} \ No newline at end of file From 515d91cd0709b855cba8292d3bc245c4f80fe208 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Wed, 16 Oct 2024 13:46:30 +0100 Subject: [PATCH 21/30] update config Signed-off-by: Nok Lam Chan --- asv.conf.json | 4 ++-- kedro_benchmarks/{src/benchmarks => }/__init__.py | 0 kedro_benchmarks/tests/__init__.py | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename kedro_benchmarks/{src/benchmarks => }/__init__.py (100%) delete mode 100644 kedro_benchmarks/tests/__init__.py diff --git a/asv.conf.json b/asv.conf.json index f48e2fcfb8..d04448e42b 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -4,7 +4,7 @@ "project_url": "https://kedro.org/", "repo": ".", "install_command": [ - "pip install -e . ./kedro_benchmarks/ kedro-datasets[pandas-csvdataset]" + "pip install -e . kedro-datasets[pandas-csvdataset]" ], "branches": [ "noklam/stress-testing-runners-4127" @@ -14,5 +14,5 @@ "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", "html_dir": ".asv/html", - "benchmark_directory": "kedro_benchmarks" + "benchmark_dir": "kedro_benchmarks" } \ No newline at end of file diff --git a/kedro_benchmarks/src/benchmarks/__init__.py b/kedro_benchmarks/__init__.py similarity index 100% rename from kedro_benchmarks/src/benchmarks/__init__.py rename to kedro_benchmarks/__init__.py diff --git a/kedro_benchmarks/tests/__init__.py b/kedro_benchmarks/tests/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 From 74d24bf53de743fe2d806e23c089288bc842d570 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Fri, 18 Oct 2024 14:51:34 +0100 Subject: [PATCH 22/30] fix memory test Signed-off-by: Nok Lam Chan --- benchmarkss/benchmark_runner.py | 154 ++++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 benchmarkss/benchmark_runner.py diff --git a/benchmarkss/benchmark_runner.py b/benchmarkss/benchmark_runner.py new file mode 100644 index 0000000000..d49c45132f --- /dev/null +++ b/benchmarkss/benchmark_runner.py @@ -0,0 +1,154 @@ +# Write the benchmarking functions here. +# See "Writing benchmarks" in the asv docs for more information. + +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline +from kedro.io.data_catalog import DataCatalog +from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner +import time +import numpy as np +import yaml +import pandas as pd +from kedro.io.memory_dataset import MemoryDataset +from pathlib import Path + + +# Simulate an I/O-bound task +def io_bound_task(input_data): + time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) + output = input_data + return output + + +# Simulate a compute-bound task (matrix multiplication) +def compute_bound_task(input_data) -> str: + # Simulate heavy compute that are not using multicore (not pandas/numpy etc) + ans = 1 + for i in range(1, 50000): + ans = ans * i + return "dummy" + + +def create_data_catalog(): + """ + Use dataset factory pattern to make sure the benchmark cover the slowest path. + """ + catalog_conf = """ + +'output_{pattern}': + type: pandas.CSVDataset + filepath: benchmarks/data/'{pattern}.csv' + +'numpy_{pattern}': + type: pickle.PickleDataset + filepath: benchmarks/data/'{pattern}.pkl' + +'{catch_all_dataset_pattern}': + type: pandas.CSVDataset + filepath: benchmarks/data/data.csv +""" + catalog_conf = yaml.safe_load(catalog_conf) + catalog = DataCatalog.from_config(catalog_conf) + return catalog + + +def create_io_bound_node(inputs=None, outputs=None, name=None): + io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name) + return io_node + + +def create_io_bound_pipeline(): + dummy_pipeline = pipeline( + [ + create_io_bound_node("dummy_1", "output_1"), + create_io_bound_node("dummy_2", "output_2"), + create_io_bound_node("dummy_3", "output_3"), + create_io_bound_node("dummy_4", "output_4"), + create_io_bound_node("dummy_5", "output_5"), + create_io_bound_node("dummy_6", "output_6"), + create_io_bound_node("dummy_7", "output_7"), + create_io_bound_node("dummy_1", "output_8"), + create_io_bound_node("dummy_1", "output_9"), + create_io_bound_node("dummy_1", "output_10"), + ] + ) + return dummy_pipeline + + +def create_compute_bound_node(inputs=None, outputs=None, name=None): + io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name) + return io_node + + +def create_compute_bound_pipeline(): + dummy_pipeline = pipeline( + [ + create_compute_bound_node("dummy_1", "numpy_1"), + create_compute_bound_node("dummy_2", "numpy_2"), + create_compute_bound_node("dummy_3", "numpy_3"), + create_compute_bound_node("dummy_4", "numpy_4"), + create_compute_bound_node("dummy_5", "numpy_5"), + create_compute_bound_node("dummy_6", "numpy_6"), + create_compute_bound_node("dummy_7", "numpy_7"), + create_compute_bound_node("dummy_1", "numpy_8"), + create_compute_bound_node("dummy_1", "numpy_9"), + create_compute_bound_node("dummy_1", "numpy_10"), + ] + ) + return dummy_pipeline + + +class RunnerMemorySuite: + params = [SequentialRunner, ThreadRunner, ParallelRunner] + param_names = ["runner"] + + def setup(self, *args, **kwargs): + data_dir = Path("benchmarks/data") + data_dir.mkdir(exist_ok=True, parents=True) + + # Create a dummy csv + with open(data_dir / "data.csv", "w") as f: + f.write("col1,col2\n1,2\n") + + def mem_runners(self, runner): + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = runner() + runner_obj.run(test_pipeline, catalog=catalog) + return catalog + + def peakmem_runners(self, runner): + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = runner() + runner_obj.run(test_pipeline, catalog=catalog) + + +class RunnerTimeSuite: + def setup(self, *args, **kwargs): + data_dir = Path("benchmarks/data") + data_dir.mkdir(exist_ok=True, parents=True) + + # Create a dummy csv + with open(data_dir / "data.csv", "w") as f: + f.write("col1,col2\n1,2\n") + + def time_sequential_runner(self): + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = SequentialRunner() + runner_obj.run(test_pipeline, catalog=catalog) + + def time_parallel_runner(self): + """compute bound pipeline""" + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = ParallelRunner() + runner_obj.run(test_pipeline, catalog=catalog) + + def time_thread_runner(self): + """IO bound pipeline""" + catalog = create_data_catalog() + test_pipeline = create_io_bound_pipeline() + runner_obj = ThreadRunner() + runner_obj.run(test_pipeline, catalog=catalog) From aaa9a3f80006c216ed80537685fd25af862ff85b Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 21 Oct 2024 11:29:07 +0100 Subject: [PATCH 23/30] remove memory tracking since it's not meaningful Signed-off-by: Nok Lam Chan --- benchmarkss/benchmark_runner.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/benchmarkss/benchmark_runner.py b/benchmarkss/benchmark_runner.py index d49c45132f..d2bc22ae37 100644 --- a/benchmarkss/benchmark_runner.py +++ b/benchmarkss/benchmark_runner.py @@ -20,9 +20,9 @@ def io_bound_task(input_data): return output -# Simulate a compute-bound task (matrix multiplication) +# Simulate a compute-bound task def compute_bound_task(input_data) -> str: - # Simulate heavy compute that are not using multicore (not pandas/numpy etc) + # Simulate heavy compute that use single core(not pandas/numpy etc) ans = 1 for i in range(1, 50000): ans = ans * i @@ -110,13 +110,6 @@ def setup(self, *args, **kwargs): with open(data_dir / "data.csv", "w") as f: f.write("col1,col2\n1,2\n") - def mem_runners(self, runner): - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = runner() - runner_obj.run(test_pipeline, catalog=catalog) - return catalog - def peakmem_runners(self, runner): catalog = create_data_catalog() test_pipeline = create_compute_bound_pipeline() From 8ee7dacc2fbd919e4b1580df249e78dc172195e2 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 21 Oct 2024 11:35:36 +0100 Subject: [PATCH 24/30] test Signed-off-by: Nok Lam Chan --- asv.conf.json | 1 - {benchmarkss => benchmarks}/benchmark_runner.py | 10 ++++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) rename {benchmarkss => benchmarks}/benchmark_runner.py (92%) diff --git a/asv.conf.json b/asv.conf.json index 3253c0be23..b2a4fdc767 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -14,7 +14,6 @@ "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", "html_dir": ".asv/html", - "benchmark_dir": "kedro_benchmarks", "matrix": { "req": { "kedro-datasets[pandas]": [], diff --git a/benchmarkss/benchmark_runner.py b/benchmarks/benchmark_runner.py similarity index 92% rename from benchmarkss/benchmark_runner.py rename to benchmarks/benchmark_runner.py index d2bc22ae37..588e1ac923 100644 --- a/benchmarkss/benchmark_runner.py +++ b/benchmarks/benchmark_runner.py @@ -20,9 +20,9 @@ def io_bound_task(input_data): return output -# Simulate a compute-bound task +# Simulate a compute-bound task (matrix multiplication) def compute_bound_task(input_data) -> str: - # Simulate heavy compute that use single core(not pandas/numpy etc) + # Simulate heavy compute that are not using multicore (not pandas/numpy etc) ans = 1 for i in range(1, 50000): ans = ans * i @@ -110,6 +110,12 @@ def setup(self, *args, **kwargs): with open(data_dir / "data.csv", "w") as f: f.write("col1,col2\n1,2\n") + def mem_runners(self, runner): + catalog = create_data_catalog() + test_pipeline = create_compute_bound_pipeline() + runner_obj = runner() + runner_obj.run(test_pipeline, catalog=catalog) + def peakmem_runners(self, runner): catalog = create_data_catalog() test_pipeline = create_compute_bound_pipeline() From 05bcd5e1b25bc7bed5757cfd6b85000f743d9210 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 21 Oct 2024 11:43:24 +0100 Subject: [PATCH 25/30] commit benchmark module Signed-off-by: Nok Lam Chan --- benchmarks/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 benchmarks/__init__.py diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 From f1a12352de92279f52a9ff960e42e1ebab32092d Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 21 Oct 2024 15:48:43 +0100 Subject: [PATCH 26/30] ADD README Signed-off-by: Nok Lam Chan --- benchmarks/README.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 benchmarks/README.md diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000000..6192e1b59f --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,21 @@ +This is the benchmark repository of Kedro, which is mainly used internally: + +# Installation +`pip install asv` + + +# Run the benchmark +Run this in the terminal: +`asv run` + +You can also run the benchmark for specific commits or a range of commits, for details +checkout the [official documentation](https://asv.readthedocs.io/en/stable/using.html#benchmarking) + +For example, `asv run main..mybranch` will run benchmark against every single commits since branching off from +`main`. + +## Compare benchmark for two commits: +Run this in the terminal: +`asv compare v0.1 v0.2` + +This run benchmark against two different commits \ No newline at end of file From 2349b9fbc44e74e60271d13c44dcff44aa600c8c Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Tue, 22 Oct 2024 11:51:01 +0100 Subject: [PATCH 27/30] rename kedro_benchmarks Signed-off-by: Nok Lam Chan --- benchmarks/__init__.py | 0 benchmarks/benchmark_runner.py | 153 ------------------ {benchmarks => kedro_benchmarks}/README.md | 0 .../benchmark_datacatalog.py | 0 .../benchmark_kedrodatacatalog.py | 0 .../benchmark_ocl.py | 0 6 files changed, 153 deletions(-) delete mode 100644 benchmarks/__init__.py delete mode 100644 benchmarks/benchmark_runner.py rename {benchmarks => kedro_benchmarks}/README.md (100%) rename {benchmarks => kedro_benchmarks}/benchmark_datacatalog.py (100%) rename {benchmarks => kedro_benchmarks}/benchmark_kedrodatacatalog.py (100%) rename {benchmarks => kedro_benchmarks}/benchmark_ocl.py (100%) diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/benchmarks/benchmark_runner.py b/benchmarks/benchmark_runner.py deleted file mode 100644 index 588e1ac923..0000000000 --- a/benchmarks/benchmark_runner.py +++ /dev/null @@ -1,153 +0,0 @@ -# Write the benchmarking functions here. -# See "Writing benchmarks" in the asv docs for more information. - -from kedro.pipeline import node -from kedro.pipeline.modular_pipeline import pipeline -from kedro.io.data_catalog import DataCatalog -from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner -import time -import numpy as np -import yaml -import pandas as pd -from kedro.io.memory_dataset import MemoryDataset -from pathlib import Path - - -# Simulate an I/O-bound task -def io_bound_task(input_data): - time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) - output = input_data - return output - - -# Simulate a compute-bound task (matrix multiplication) -def compute_bound_task(input_data) -> str: - # Simulate heavy compute that are not using multicore (not pandas/numpy etc) - ans = 1 - for i in range(1, 50000): - ans = ans * i - return "dummy" - - -def create_data_catalog(): - """ - Use dataset factory pattern to make sure the benchmark cover the slowest path. - """ - catalog_conf = """ - -'output_{pattern}': - type: pandas.CSVDataset - filepath: benchmarks/data/'{pattern}.csv' - -'numpy_{pattern}': - type: pickle.PickleDataset - filepath: benchmarks/data/'{pattern}.pkl' - -'{catch_all_dataset_pattern}': - type: pandas.CSVDataset - filepath: benchmarks/data/data.csv -""" - catalog_conf = yaml.safe_load(catalog_conf) - catalog = DataCatalog.from_config(catalog_conf) - return catalog - - -def create_io_bound_node(inputs=None, outputs=None, name=None): - io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name) - return io_node - - -def create_io_bound_pipeline(): - dummy_pipeline = pipeline( - [ - create_io_bound_node("dummy_1", "output_1"), - create_io_bound_node("dummy_2", "output_2"), - create_io_bound_node("dummy_3", "output_3"), - create_io_bound_node("dummy_4", "output_4"), - create_io_bound_node("dummy_5", "output_5"), - create_io_bound_node("dummy_6", "output_6"), - create_io_bound_node("dummy_7", "output_7"), - create_io_bound_node("dummy_1", "output_8"), - create_io_bound_node("dummy_1", "output_9"), - create_io_bound_node("dummy_1", "output_10"), - ] - ) - return dummy_pipeline - - -def create_compute_bound_node(inputs=None, outputs=None, name=None): - io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name) - return io_node - - -def create_compute_bound_pipeline(): - dummy_pipeline = pipeline( - [ - create_compute_bound_node("dummy_1", "numpy_1"), - create_compute_bound_node("dummy_2", "numpy_2"), - create_compute_bound_node("dummy_3", "numpy_3"), - create_compute_bound_node("dummy_4", "numpy_4"), - create_compute_bound_node("dummy_5", "numpy_5"), - create_compute_bound_node("dummy_6", "numpy_6"), - create_compute_bound_node("dummy_7", "numpy_7"), - create_compute_bound_node("dummy_1", "numpy_8"), - create_compute_bound_node("dummy_1", "numpy_9"), - create_compute_bound_node("dummy_1", "numpy_10"), - ] - ) - return dummy_pipeline - - -class RunnerMemorySuite: - params = [SequentialRunner, ThreadRunner, ParallelRunner] - param_names = ["runner"] - - def setup(self, *args, **kwargs): - data_dir = Path("benchmarks/data") - data_dir.mkdir(exist_ok=True, parents=True) - - # Create a dummy csv - with open(data_dir / "data.csv", "w") as f: - f.write("col1,col2\n1,2\n") - - def mem_runners(self, runner): - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = runner() - runner_obj.run(test_pipeline, catalog=catalog) - - def peakmem_runners(self, runner): - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = runner() - runner_obj.run(test_pipeline, catalog=catalog) - - -class RunnerTimeSuite: - def setup(self, *args, **kwargs): - data_dir = Path("benchmarks/data") - data_dir.mkdir(exist_ok=True, parents=True) - - # Create a dummy csv - with open(data_dir / "data.csv", "w") as f: - f.write("col1,col2\n1,2\n") - - def time_sequential_runner(self): - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = SequentialRunner() - runner_obj.run(test_pipeline, catalog=catalog) - - def time_parallel_runner(self): - """compute bound pipeline""" - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = ParallelRunner() - runner_obj.run(test_pipeline, catalog=catalog) - - def time_thread_runner(self): - """IO bound pipeline""" - catalog = create_data_catalog() - test_pipeline = create_io_bound_pipeline() - runner_obj = ThreadRunner() - runner_obj.run(test_pipeline, catalog=catalog) diff --git a/benchmarks/README.md b/kedro_benchmarks/README.md similarity index 100% rename from benchmarks/README.md rename to kedro_benchmarks/README.md diff --git a/benchmarks/benchmark_datacatalog.py b/kedro_benchmarks/benchmark_datacatalog.py similarity index 100% rename from benchmarks/benchmark_datacatalog.py rename to kedro_benchmarks/benchmark_datacatalog.py diff --git a/benchmarks/benchmark_kedrodatacatalog.py b/kedro_benchmarks/benchmark_kedrodatacatalog.py similarity index 100% rename from benchmarks/benchmark_kedrodatacatalog.py rename to kedro_benchmarks/benchmark_kedrodatacatalog.py diff --git a/benchmarks/benchmark_ocl.py b/kedro_benchmarks/benchmark_ocl.py similarity index 100% rename from benchmarks/benchmark_ocl.py rename to kedro_benchmarks/benchmark_ocl.py From 1f7f053c67aa01d079c47bc1c236be9029a60761 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Tue, 22 Oct 2024 11:51:33 +0100 Subject: [PATCH 28/30] update asv config Signed-off-by: Nok Lam Chan --- asv.conf.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/asv.conf.json b/asv.conf.json index b2a4fdc767..6749aa87ac 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -13,10 +13,11 @@ "environment_type": "virtualenv", "show_commit_url": "http://github.com/kedro-org/kedro/commit/", "results_dir": ".asv/results", + "benchmark_dir": "kedro_benchmarks", "html_dir": ".asv/html", "matrix": { "req": { - "kedro-datasets[pandas]": [], + "kedro-datasets[pandas]": [] } } -} +} \ No newline at end of file From 0ba5c443c2e3160c55545fdd8e541dcc47e5d6d0 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Tue, 22 Oct 2024 14:00:43 +0100 Subject: [PATCH 29/30] lint Signed-off-by: Nok Lam Chan --- asv.conf.json | 2 +- kedro_benchmarks/README.md | 2 +- kedro_benchmarks/benchmark_runner.py | 19 +++++++++---------- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/asv.conf.json b/asv.conf.json index 6749aa87ac..21fd243d14 100644 --- a/asv.conf.json +++ b/asv.conf.json @@ -20,4 +20,4 @@ "kedro-datasets[pandas]": [] } } -} \ No newline at end of file +} diff --git a/kedro_benchmarks/README.md b/kedro_benchmarks/README.md index 6192e1b59f..6881b70738 100644 --- a/kedro_benchmarks/README.md +++ b/kedro_benchmarks/README.md @@ -18,4 +18,4 @@ For example, `asv run main..mybranch` will run benchmark against every single co Run this in the terminal: `asv compare v0.1 v0.2` -This run benchmark against two different commits \ No newline at end of file +This run benchmark against two different commits diff --git a/kedro_benchmarks/benchmark_runner.py b/kedro_benchmarks/benchmark_runner.py index 588e1ac923..669c4a644c 100644 --- a/kedro_benchmarks/benchmark_runner.py +++ b/kedro_benchmarks/benchmark_runner.py @@ -1,17 +1,16 @@ # Write the benchmarking functions here. # See "Writing benchmarks" in the asv docs for more information. -from kedro.pipeline import node -from kedro.pipeline.modular_pipeline import pipeline -from kedro.io.data_catalog import DataCatalog -from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner import time -import numpy as np -import yaml -import pandas as pd -from kedro.io.memory_dataset import MemoryDataset from pathlib import Path +import yaml + +from kedro.io.data_catalog import DataCatalog +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline +from kedro.runner import ParallelRunner, SequentialRunner, ThreadRunner + # Simulate an I/O-bound task def io_bound_task(input_data): @@ -99,8 +98,8 @@ def create_compute_bound_pipeline(): class RunnerMemorySuite: - params = [SequentialRunner, ThreadRunner, ParallelRunner] - param_names = ["runner"] + params = (SequentialRunner, ThreadRunner, ParallelRunner,) + param_names = ("runner",) def setup(self, *args, **kwargs): data_dir = Path("benchmarks/data") From f818826517590333c4da982aad1b4d426ae93659 Mon Sep 17 00:00:00 2001 From: Nok Lam Chan Date: Mon, 28 Oct 2024 13:48:44 +0000 Subject: [PATCH 30/30] test matrix of runner Signed-off-by: Nok Lam Chan --- kedro_benchmarks/benchmark_runner.py | 38 +++++++++++++++++----------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/kedro_benchmarks/benchmark_runner.py b/kedro_benchmarks/benchmark_runner.py index 669c4a644c..c6bf485795 100644 --- a/kedro_benchmarks/benchmark_runner.py +++ b/kedro_benchmarks/benchmark_runner.py @@ -1,6 +1,7 @@ # Write the benchmarking functions here. # See "Writing benchmarks" in the asv docs for more information. +import importlib import time from pathlib import Path @@ -9,7 +10,6 @@ from kedro.io.data_catalog import DataCatalog from kedro.pipeline import node from kedro.pipeline.modular_pipeline import pipeline -from kedro.runner import ParallelRunner, SequentialRunner, ThreadRunner # Simulate an I/O-bound task @@ -98,7 +98,11 @@ def create_compute_bound_pipeline(): class RunnerMemorySuite: - params = (SequentialRunner, ThreadRunner, ParallelRunner,) + params = ( + "SequentialRunner", + "ThreadRunner", + "ParallelRunner", + ) param_names = ("runner",) def setup(self, *args, **kwargs): @@ -112,17 +116,26 @@ def setup(self, *args, **kwargs): def mem_runners(self, runner): catalog = create_data_catalog() test_pipeline = create_compute_bound_pipeline() - runner_obj = runner() + runner_module = importlib.import_module("kedro.runner") + runner_obj = getattr(runner_module, runner)() runner_obj.run(test_pipeline, catalog=catalog) def peakmem_runners(self, runner): catalog = create_data_catalog() test_pipeline = create_compute_bound_pipeline() - runner_obj = runner() + runner_module = importlib.import_module("kedro.runner") + runner_obj = getattr(runner_module, runner)() runner_obj.run(test_pipeline, catalog=catalog) class RunnerTimeSuite: + params = ( + "SequentialRunner", + "ThreadRunner", + "ParallelRunner", + ) + param_names = ("runner",) + def setup(self, *args, **kwargs): data_dir = Path("benchmarks/data") data_dir.mkdir(exist_ok=True, parents=True) @@ -131,22 +144,17 @@ def setup(self, *args, **kwargs): with open(data_dir / "data.csv", "w") as f: f.write("col1,col2\n1,2\n") - def time_sequential_runner(self): - catalog = create_data_catalog() - test_pipeline = create_compute_bound_pipeline() - runner_obj = SequentialRunner() - runner_obj.run(test_pipeline, catalog=catalog) - - def time_parallel_runner(self): - """compute bound pipeline""" + def time_compute_bound_runner(self, runner): catalog = create_data_catalog() test_pipeline = create_compute_bound_pipeline() - runner_obj = ParallelRunner() + runner_module = importlib.import_module("kedro.runner") + runner_obj = getattr(runner_module, runner)() runner_obj.run(test_pipeline, catalog=catalog) - def time_thread_runner(self): + def time_io_bound_runner(self, runner): """IO bound pipeline""" catalog = create_data_catalog() test_pipeline = create_io_bound_pipeline() - runner_obj = ThreadRunner() + runner_module = importlib.import_module("kedro.runner") + runner_obj = getattr(runner_module, runner)() runner_obj.run(test_pipeline, catalog=catalog)