Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThreadRunner Dataset DatasetAlreadyExistsError: Dataset has already been registered #4250

Open
noklam opened this issue Oct 22, 2024 · 2 comments
Assignees
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed

Comments

@noklam
Copy link
Contributor

noklam commented Oct 22, 2024

Description

Originated from #4210

Context

Upon investigation, I found that this error seems to be related to dataset factory pattern only.

  1. If dataset are registered dataset in catalog, I can run without any error from 0.18.12 (didn't go earlier since factory not introduced, to current release)
  2. If dataset factory is used, it starts breaking since 0.18.12

The current conclusion is that this is not an error introduced recently. Though there seems to be partial fix previously but it doesn't works for my test case.

Related:

Steps to Reproduce

Using a similar test written in #4210 from benchmark_runner.py for ThreadRunner.

This is the snippet that I use:

# Write the benchmarking functions here.
# See "Writing benchmarks" in the asv docs for more information.

import time
from pathlib import Path

import yaml

from kedro.io.data_catalog import DataCatalog
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
from kedro.runner import ParallelRunner, SequentialRunner, ThreadRunner


# Simulate an I/O-bound task
def io_bound_task(input_data):
    time.sleep(2)  # Simulate an I/O wait (e.g., reading from a file)
    output = input_data
    return output


# Simulate a compute-bound task (matrix multiplication)
def compute_bound_task(input_data) -> str:
    # Simulate heavy compute that are not using multicore (not pandas/numpy etc)
    ans = 1
    for i in range(1, 50000):
        ans = ans * i
    return "dummy"


def create_data_catalog():
    """
    Use dataset factory pattern to make sure the benchmark cover the slowest path.
    """
    catalog_conf = """

'output_{pattern}':
    type: pandas.CSVDataset
    filepath: benchmarks/data/'{pattern}.csv'

'numpy_{pattern}':
    type: pickle.PickleDataset
    filepath: benchmarks/data/'{pattern}.pkl'

'{catch_all}':
    type: pandas.CSVDataset
    filepath: benchmarks/data/data.csv


# dummy_1:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_8:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_2:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_3:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_4:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_5:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_6:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv

# dummy_7:
#     type: pandas.CSVDataset
#     filepath: benchmarks/data/data.csv
"""
    catalog_conf = yaml.safe_load(catalog_conf)
    catalog = DataCatalog.from_config(catalog_conf)
    return catalog


def create_io_bound_node(inputs=None, outputs=None, name=None):
    io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name)
    return io_node


def create_io_bound_pipeline():
    dummy_pipeline = pipeline(
        [
            create_io_bound_node("dummy_1", "output_1"),
            create_io_bound_node("dummy_2", "output_2"),
            create_io_bound_node("dummy_3", "output_3"),
            create_io_bound_node("dummy_4", "output_4"),
            create_io_bound_node("dummy_5", "output_5"),
            create_io_bound_node("dummy_6", "output_6"),
            create_io_bound_node("dummy_7", "output_7"),
            create_io_bound_node("dummy_1", "output_8"),
            create_io_bound_node("dummy_1", "output_9"),
            create_io_bound_node("dummy_1", "output_10"),
        ]
    )
    return dummy_pipeline


if __name__ == "__main__":
    catalog = create_data_catalog()
    test_pipeline = create_io_bound_pipeline()
    runner_obj = ThreadRunner()
    runner_obj.run(test_pipeline, catalog=catalog)

Run this multiple times to confirm it fails (non-deterministic fail due to race condition). Then uncomment the dummy_x dataset to pre-register it, now it always pass.

Expected Result

Actual Result

-- If you received an error, place it here.
-- Separate them if you have more than one.

Your Environment

  • Kedro version used (pip show kedro or kedro -V):
  • Python version used (python -V):
  • Operating system and version:
@noklam noklam changed the title ThreadRunner Dataset Datasetis registered error ThreadRunner Dataset DatasetAlreadyExistsError: Dataset has already been registered Oct 22, 2024
@noklam noklam added the Issue: Bug Report 🐞 Bug that needs to be fixed label Oct 22, 2024
@noklam
Copy link
Contributor Author

noklam commented Oct 22, 2024

We discussed this in private, the conclusion is that it's exactly the same issue users report before. #4093 attempts to fix this, but it only fix the path where user start a KedroSession, i.e. kedro run. This is why it's breaking for the benchmark tests, and the unit tests that users created.

We agreed the temporary fix should goes into ThreadRunner. For longer term, that may goes into catalog instead. In parallel there are needs for listing catalog with pattern, so it's something we need to consider for Catalog & Runner re-design:

@ElenaKhaustova
Copy link
Contributor

Based on the solution proposed for lazy loading #3935 (comment) we suggest moving the warm-up to the AbstractRunner before we call _run() and making this logic common for all runners -

self._run(pipeline, catalog, hook_or_null_manager, session_id) # type: ignore[arg-type]

Further, we replace this logic with lazy loading warm-up which will be common for all the runners as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed
Projects
Status: Done
Development

No branches or pull requests

2 participants