Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatasetAlreadyExistsError when using dataset factories and ThreadRunner #4007

Closed
jaguirre16 opened this issue Jul 12, 2024 · 2 comments · Fixed by #4093
Closed

DatasetAlreadyExistsError when using dataset factories and ThreadRunner #4007

jaguirre16 opened this issue Jul 12, 2024 · 2 comments · Fixed by #4093
Assignees
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed

Comments

@jaguirre16
Copy link

jaguirre16 commented Jul 12, 2024

Description

Kedro is trying to register same input dataset twice when used in multiple nodes and using ThreadRunner.

Context

Im trying to use dataset factories with ThreadRunner to run independent nodes in parallel and reduce execution time. If using SecuentialRunner, pipeline works, when using ThreadRunner, DatasetAlreadyExistsError thrown.

Steps to Reproduce

Minimal code to reproduce:
catalog.yml

in-{dataset_name}:
  type: "pandas.CSVDataset"
  filepath: "data/{dataset_name}.csv"

out-{dataset_name}:
  type: "pandas.CSVDataset"
  filepath: "data/{dataset_name}.csv"`

pipeline.py

 from kedro.pipeline import Pipeline, pipeline, node

def func(df):
    return df

def create_pipeline() -> Pipeline:
    return pipeline([
        node(
            func,
            inputs=["in-test1"],
            outputs="out-test2"
        ),
        node(
            func,
            inputs=["in-test1"],
            outputs="out-test3"
        ),
    ])

Run the project:
kedro run -r ThreadRunner

Expected Result

Read same input on different nodes

Actual Result

Pipeline fails. Error log:

user@pc segcom % kedro run -r ThreadRunner
24/07/11 23:55:36 WARN Utils: Your hostname, pc.local resolves to a loopback address: 127.0.0.1; using 192.168.0.21 instead (on interface en0)
24/07/11 23:55:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/11 23:55:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
│ /Users/user/opt/anaconda3/envs/segcom/bin/kedro:8 in <module>                                │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/framework/cli/cli.p │
│ y:233 in main                                                                                    │
│                                                                                                  │
│   230 │   cli_collection = KedroCLI(                                                             │
│   231 │   │   project_path=_find_kedro_project(Path.cwd()) or Path.cwd()                         │
│   232 │   )                                                                                      │
│ ❱ 233 │   cli_collection()                                                                       │
│   234                                                                                            │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/click/core.py:1157 in     │
│ __call__                                                                                         │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/framework/cli/cli.p │
│ y:130 in main                                                                                    │
│                                                                                                  │
│   127 │   │   )                                                                                  │
│   128 │   │                                                                                      │
│   129 │   │   try:                                                                               │
│ ❱ 130 │   │   │   super().main(                                                                  │
│   131 │   │   │   │   args=args,                                                                 │
│   132 │   │   │   │   prog_name=prog_name,                                                       │
│   133 │   │   │   │   complete_var=complete_var,                                                 │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/click/core.py:1078 in     │
│ main                                                                                             │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/click/core.py:1688 in     │
│ invoke                                                                                           │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/click/core.py:1434 in     │
│ invoke                                                                                           │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/click/core.py:783 in      │
│ invoke                                                                                           │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/framework/cli/proje │
│ ct.py:225 in run                                                                                 │
│                                                                                                  │
│   222 │   with KedroSession.create(                                                              │
│   223 │   │   env=env, conf_source=conf_source, extra_params=params                              │
│   224 │   ) as session:                                                                          │
│ ❱ 225 │   │   session.run(                                                                       │
│   226 │   │   │   tags=tuple_tags,                                                               │
│   227 │   │   │   runner=runner_obj(is_async=is_async),                                          │
│   228 │   │   │   node_names=tuple_node_names,                                                   │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/framework/session/s │
│ ession.py:395 in run                                                                             │
│                                                                                                  │
│   392 │   │   )                                                                                  │
│   393 │   │                                                                                      │
│   394 │   │   try:                                                                               │
│ ❱ 395 │   │   │   run_result = runner.run(                                                       │
│   396 │   │   │   │   filtered_pipeline, catalog, hook_manager, session_id                       │
│   397 │   │   │   )                                                                              │
│   398 │   │   │   self._run_called = True                                                        │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/runner/runner.py:11 │
│ 7 in run                                                                                         │
│                                                                                                  │
│   114 │   │   │   self._logger.info(                                                             │
│   115 │   │   │   │   "Asynchronous mode is enabled for loading and saving data"                 │
│   116 │   │   │   )                                                                              │
│ ❱ 117 │   │   self._run(pipeline, catalog, hook_or_null_manager, session_id)  # type: ignore[a   │
│   118 │   │                                                                                      │
│   119 │   │   self._logger.info("Pipeline execution completed successfully.")                    │
│   120                                                                                            │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/runner/thread_runne │
│ r.py:133 in _run                                                                                 │
│                                                                                                  │
│   130 │   │   │   │   done, futures = wait(futures, return_when=FIRST_COMPLETED)                 │
│   131 │   │   │   │   for future in done:                                                        │
│   132 │   │   │   │   │   try:                                                                   │
│ ❱ 133 │   │   │   │   │   │   node = future.result()                                             │
│   134 │   │   │   │   │   except Exception:                                                      │
│   135 │   │   │   │   │   │   self._suggest_resume_scenario(pipeline, done_nodes, catalog)       │
│   136 │   │   │   │   │   │   raise                                                              │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/concurrent/futures/_base.py:451 in      │
│ result                                                                                           │
│                                                                                                  │
│   448 │   │   │   │   if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:                     │
│   449 │   │   │   │   │   raise CancelledError()                                                 │
│   450 │   │   │   │   elif self._state == FINISHED:                                              │
│ ❱ 451 │   │   │   │   │   return self.__get_result()                                             │
│   452 │   │   │   │                                                                              │
│   453 │   │   │   │   self._condition.wait(timeout)                                              │
│   454                                                                                            │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/concurrent/futures/_base.py:403 in      │
│ __get_result                                                                                     │
│                                                                                                  │
│   400 │   def __get_result(self):                                                                │
│   401 │   │   if self._exception:                                                                │
│   402 │   │   │   try:                                                                           │
│ ❱ 403 │   │   │   │   raise self._exception                                                      │
│   404 │   │   │   finally:                                                                       │
│   405 │   │   │   │   # Break a reference cycle with the exception in self._exception            │
│   406 │   │   │   │   self = None                                                                │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/concurrent/futures/thread.py:58 in run  │
│                                                                                                  │
│    55 │   │   │   return                                                                         │
│    56 │   │                                                                                      │
│    57 │   │   try:                                                                               │
│ ❱  58 │   │   │   result = self.fn(*self.args, **self.kwargs)                                    │
│    59 │   │   except BaseException as exc:                                                       │
│    60 │   │   │   self.future.set_exception(exc)                                                 │
│    61 │   │   │   # Break a reference cycle with the exception 'exc'                             │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/runner/runner.py:41 │
│ 3 in run_node                                                                                    │
│                                                                                                  │
│   410 │   if is_async:                                                                           │
│   411 │   │   node = _run_node_async(node, catalog, hook_manager, session_id)                    │
│   412 │   else:                                                                                  │
│ ❱ 413 │   │   node = _run_node_sequential(node, catalog, hook_manager, session_id)               │
│   414 │                                                                                          │
│   415 │   for name in node.confirms:                                                             │
│   416 │   │   catalog.confirm(name)                                                              │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/runner/runner.py:49 │
│ 4 in _run_node_sequential                                                                        │
│                                                                                                  │
│   491 │                                                                                          │
│   492 │   for name in node.inputs:                                                               │
│   493 │   │   hook_manager.hook.before_dataset_loaded(dataset_name=name, node=node)              │
│ ❱ 494 │   │   inputs[name] = catalog.load(name)                                                  │
│   495 │   │   hook_manager.hook.after_dataset_loaded(                                            │
│   496 │   │   │   dataset_name=name, data=inputs[name], node=node                                │
│   497 │   │   )                                                                                  │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/io/data_catalog.py: │
│ 506 in load                                                                                      │
│                                                                                                  │
│   503 │   │   │   >>> df = io.load("cars")                                                       │
│   504 │   │   """                                                                                │
│   505 │   │   load_version = Version(version, None) if version else None                         │
│ ❱ 506 │   │   dataset = self._get_dataset(name, version=load_version)                            │
│   507 │   │                                                                                      │
│   508 │   │   self._logger.info(                                                                 │
│   509 │   │   │   "Loading data from [dark_orange]%s[/dark_orange] (%s)...",                     │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/io/data_catalog.py: │
│ 422 in _get_dataset                                                                              │
│                                                                                                  │
│   419 │   │   │   │   │   dataset_name,                                                          │
│   420 │   │   │   │   )                                                                          │
│   421 │   │   │                                                                                  │
│ ❱ 422 │   │   │   self.add(dataset_name, dataset)                                                │
│   423 │   │   if dataset_name not in self._datasets:                                             │
│   424 │   │   │   error_msg = f"Dataset '{dataset_name}' not found in the catalog"               │
│   425                                                                                            │
│                                                                                                  │
│ /Users/user/opt/anaconda3/envs/segcom/lib/python3.10/site-packages/kedro/io/data_catalog.py: │
│ 622 in add                                                                                       │
│                                                                                                  │
│   619 │   │   │   if replace:                                                                    │
│   620 │   │   │   │   self._logger.warning("Replacing dataset '%s'", dataset_name)               │
│   621 │   │   │   else:                                                                          │
│ ❱ 622 │   │   │   │   raise DatasetAlreadyExistsError(                                           │
│   623 │   │   │   │   │   f"Dataset '{dataset_name}' has already been registered"                │
│   624 │   │   │   │   )                                                                          │
│   625 │   │   self._datasets[dataset_name] = dataset                                             │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
DatasetAlreadyExistsError: Dataset 'in-test1' has already been registered
Screenshot 2024-07-11 at 23 48 37

## Your Environment

* Kedro version used (`pip show kedro` or `kedro -V`): kedro -V
kedro, version 0.19.6
* Python version used (`python -V`): python -V
Python 3.10.12
* Operating system and version:
MacOS Darwin Kernel Version 23.5.0
@merelcht
Copy link
Member

Hi @jaguirre16 , thanks for reporting this! I've been able to replicate the issue. We'll try and fix this as soon as possible.

@merelcht merelcht added the Issue: Bug Report 🐞 Bug that needs to be fixed label Jul 12, 2024
@merelcht
Copy link
Member

Similar issue: #3739

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants